summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt31
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt42
3 files changed, 47 insertions, 28 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index e773ba1d..655bacb9 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -49,10 +49,11 @@ import kotlin.math.roundToLong
*/
public class ComputeWorkloadLoader(
private val pathToFile: File,
- private val checkpointInterval: Long,
- private val checkpointDuration: Long,
- private val checkpointIntervalScaling: Double,
-) : WorkloadLoader() {
+ private val subMissionTime: String? = null,
+ private val checkpointInterval: Long = 0L,
+ private val checkpointDuration: Long = 0L,
+ private val checkpointIntervalScaling: Double = 1.0,
+) : WorkloadLoader(subMissionTime) {
/**
* The logger for this instance.
*/
@@ -160,25 +161,11 @@ public class ComputeWorkloadLoader(
* Load the trace at the specified [pathToFile].
*/
override fun load(): List<Task> {
- val ref =
- cache.compute(pathToFile) { key, oldVal ->
- val inst = oldVal?.get()
- if (inst == null) {
-// val path = baseDir.resolve(key)
+ val trace = Trace.open(pathToFile, "opendc-vm")
+ val fragments = parseFragments(trace)
+ val vms = parseMeta(trace, fragments)
- logger.info { "Loading trace $key at $pathToFile" }
-
- val trace = Trace.open(pathToFile, "opendc-vm")
- val fragments = parseFragments(trace)
- val vms = parseMeta(trace, fragments)
-
- SoftReference(vms)
- } else {
- oldVal
- }
- }
-
- return checkNotNull(ref?.get()) { "Memory pressure" }
+ return vms
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index d121b381..60be9299 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -45,7 +45,7 @@ public data class Task(
val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
- val submissionTime: Instant,
+ var submissionTime: Instant,
val duration: Long,
val trace: TraceWorkload,
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
index f4a93c24..0b551a3c 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
@@ -22,24 +22,56 @@
package org.opendc.compute.workload
import mu.KotlinLogging
+import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
-public abstract class WorkloadLoader {
+public abstract class WorkloadLoader(private val submissionTime: String? = null) {
private val logger = KotlinLogging.logger {}
+ public fun reScheduleTasks(workload: List<Task>) {
+ if (submissionTime == null) {
+ return
+ }
+
+ val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli()
+ val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli()
+
+ val timeShift = submissionTimeLong - workloadSubmissionTime
+
+ for (task in workload) {
+ task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift)
+ }
+ }
+
public abstract fun load(): List<Task>
/**
* Load the workload at sample tasks until a fraction of the workload is loaded
*/
public fun sampleByLoad(fraction: Double): List<Task> {
- val vms = this.load()
+ val workload = this.load()
+
+ reScheduleTasks(workload)
+
+ if (fraction >= 1.0) {
+ return workload
+ }
+
+ if (fraction <= 0.0) {
+ throw Error("The fraction of tasks to load cannot be 0.0 or lower")
+ }
+
val res = mutableListOf<Task>()
- val totalLoad = vms.sumOf { it.totalLoad }
+ val totalLoad = workload.sumOf { it.totalLoad }
var currentLoad = 0.0
- for (entry in vms) {
+ val shuffledWorkload = workload.shuffled()
+ for (entry in shuffledWorkload) {
val entryLoad = entry.totalLoad
+
+ // TODO: ask Sacheen
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -48,7 +80,7 @@ public abstract class WorkloadLoader {
res += entry
}
- logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+ logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
return res
}