summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-23 12:46:33 +0100
committerGitHub <noreply@github.com>2025-01-23 12:46:33 +0100
commitbb945c2fdd7b20898e3dfccbac7da2a427418216 (patch)
treeb08dd7489ad4e5f8f752f0d21f745b91dc7a3d1f
parent9403af12f5d015497894fab5a2dea93eb094ecaf (diff)
Added sampleFraction and submissionTime to the workloadSpec (#295)
* Added sampleFraction and submissionTime to the workloadSpec * Removed commented code
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt31
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt42
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt7
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt3
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt12
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt2
7 files changed, 57 insertions, 42 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index e773ba1d..655bacb9 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -49,10 +49,11 @@ import kotlin.math.roundToLong
*/
public class ComputeWorkloadLoader(
private val pathToFile: File,
- private val checkpointInterval: Long,
- private val checkpointDuration: Long,
- private val checkpointIntervalScaling: Double,
-) : WorkloadLoader() {
+ private val subMissionTime: String? = null,
+ private val checkpointInterval: Long = 0L,
+ private val checkpointDuration: Long = 0L,
+ private val checkpointIntervalScaling: Double = 1.0,
+) : WorkloadLoader(subMissionTime) {
/**
* The logger for this instance.
*/
@@ -160,25 +161,11 @@ public class ComputeWorkloadLoader(
* Load the trace at the specified [pathToFile].
*/
override fun load(): List<Task> {
- val ref =
- cache.compute(pathToFile) { key, oldVal ->
- val inst = oldVal?.get()
- if (inst == null) {
-// val path = baseDir.resolve(key)
+ val trace = Trace.open(pathToFile, "opendc-vm")
+ val fragments = parseFragments(trace)
+ val vms = parseMeta(trace, fragments)
- logger.info { "Loading trace $key at $pathToFile" }
-
- val trace = Trace.open(pathToFile, "opendc-vm")
- val fragments = parseFragments(trace)
- val vms = parseMeta(trace, fragments)
-
- SoftReference(vms)
- } else {
- oldVal
- }
- }
-
- return checkNotNull(ref?.get()) { "Memory pressure" }
+ return vms
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index d121b381..60be9299 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -45,7 +45,7 @@ public data class Task(
val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
- val submissionTime: Instant,
+ var submissionTime: Instant,
val duration: Long,
val trace: TraceWorkload,
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
index f4a93c24..0b551a3c 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
@@ -22,24 +22,56 @@
package org.opendc.compute.workload
import mu.KotlinLogging
+import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
-public abstract class WorkloadLoader {
+public abstract class WorkloadLoader(private val submissionTime: String? = null) {
private val logger = KotlinLogging.logger {}
+ public fun reScheduleTasks(workload: List<Task>) {
+ if (submissionTime == null) {
+ return
+ }
+
+ val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli()
+ val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli()
+
+ val timeShift = submissionTimeLong - workloadSubmissionTime
+
+ for (task in workload) {
+ task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift)
+ }
+ }
+
public abstract fun load(): List<Task>
/**
* Load the workload at sample tasks until a fraction of the workload is loaded
*/
public fun sampleByLoad(fraction: Double): List<Task> {
- val vms = this.load()
+ val workload = this.load()
+
+ reScheduleTasks(workload)
+
+ if (fraction >= 1.0) {
+ return workload
+ }
+
+ if (fraction <= 0.0) {
+ throw Error("The fraction of tasks to load cannot be 0.0 or lower")
+ }
+
val res = mutableListOf<Task>()
- val totalLoad = vms.sumOf { it.totalLoad }
+ val totalLoad = workload.sumOf { it.totalLoad }
var currentLoad = 0.0
- for (entry in vms) {
+ val shuffledWorkload = workload.shuffled()
+ for (entry in shuffledWorkload) {
val entryLoad = entry.totalLoad
+
+ // TODO: ask Sacheen
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -48,7 +80,7 @@ public abstract class WorkloadLoader {
res += entry
}
- logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+ logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
return res
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
index a3414054..87c7abe9 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt
@@ -32,15 +32,20 @@ import java.io.File
*
* @property pathToFile
* @property type
+ * @property sampleFraction
+ * @property submissionTime
*/
@Serializable
public data class WorkloadSpec(
val pathToFile: String,
val type: WorkloadTypes,
+ val sampleFraction: Double = 1.0,
+ val submissionTime: String? = null,
) {
public val name: String = File(pathToFile).nameWithoutExtension
init {
+ require(sampleFraction > 0) { "The fraction of the tasks can not be 0.0 or lower" }
require(File(pathToFile).exists()) { "The provided path to the workload: $pathToFile does not exist " }
}
}
@@ -65,6 +70,7 @@ public enum class WorkloadTypes {
public fun getWorkloadLoader(
type: WorkloadTypes,
pathToFile: File,
+ submissionTime: String?,
checkpointInterval: Long,
checkpointDuration: Long,
checkpointIntervalScaling: Double,
@@ -73,6 +79,7 @@ public fun getWorkloadLoader(
WorkloadTypes.ComputeWorkload ->
ComputeWorkloadLoader(
pathToFile,
+ submissionTime,
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
index a0263e38..45fedd0d 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
@@ -139,9 +139,6 @@ public suspend fun ComputeService.replay(
// Wait until the task is terminated
taskWatcher.wait()
-
- // Stop the task after reaching the end-time of the virtual machine
-// task.delete()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index cda43eb7..c9c2729d 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -80,24 +80,16 @@ public fun runScenario(
val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L
val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0
-// val workloadLoader =
-// ComputeWorkloadLoader(
-// File(scenario.workloadSpec.pathToFile),
-// checkpointInterval,
-// checkpointDuration,
-// checkpointIntervalScaling,
-// )
-// val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed))
-
val workloadLoader =
getWorkloadLoader(
scenario.workloadSpec.type,
File(scenario.workloadSpec.pathToFile),
+ scenario.workloadSpec.submissionTime,
checkpointInterval,
checkpointDuration,
checkpointIntervalScaling,
)
- val workload = workloadLoader.load()
+ val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction)
val startTimeLong = workload.minOf { it.submissionTime }.toEpochMilli()
val startTime = Duration.ofMillis(startTimeLong)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 8684132c..196d6a93 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -85,7 +85,7 @@ public class OpenDCRunner(
/**
* Helper class to load the workloads.
*/
- private val workloadLoader = ComputeWorkloadLoader(tracePath, 0L, 0L, 0.0)
+ private val workloadLoader = ComputeWorkloadLoader(tracePath)
/**
* The [ForkJoinPool] that is used to execute the simulation jobs.