From 71f63618fb83c8e19ae48d5dc4a6e3927031cc10 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 4 Nov 2025 21:09:38 +0100 Subject: Memory update (#379) * Updated the memory usage of Tasks. Still in Progress. * Merged Task and ServiceTask -> Currently not fully working!!! * Fixed bugs that made the merger between Task and ServiceTask not work well. * Updated jdk version for Dockerfile * Removed ServiceFlavor.java and Task.kt --- .../experiments/base/runner/ScenarioReplayer.kt | 58 +++++++++------------- .../experiments/base/runner/ScenarioRunner.kt | 2 +- 2 files changed, 24 insertions(+), 36 deletions(-) (limited to 'opendc-experiments/opendc-experiments-base/src/main') diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index 6a7c9c55..ec4d30ce 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -34,11 +34,8 @@ import org.opendc.compute.failure.models.FailureModel import org.opendc.compute.simulator.TaskWatcher import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.service.ServiceTask -import org.opendc.compute.simulator.service.TaskNature -import org.opendc.compute.workload.Task import org.opendc.experiments.base.experiment.specs.FailureModelSpec import org.opendc.experiments.base.experiment.specs.createFailureModel -import java.time.Duration import java.time.InstantSource import java.util.Random import kotlin.coroutines.coroutineContext @@ -84,7 +81,7 @@ public class RunningTaskWatcher : TaskWatcher { */ public suspend fun ComputeService.replay( clock: InstantSource, - trace: List, + trace: List, failureModelSpec: FailureModelSpec? = null, seed: Long = 0, submitImmediately: Boolean = false, @@ -99,14 +96,19 @@ public suspend fun ComputeService.replay( try { coroutineScope { + val startTimer = System.currentTimeMillis() + // Start the fault injector failureModel?.start() var simulationOffset = Long.MIN_VALUE - for (entry in trace.sortedBy { it.submissionTime }) { +// val numTasks = trace.size +// var counter = 0 + + for (serviceTask in trace.sortedBy { it.submittedAt }) { val now = clock.millis() - val start = entry.submissionTime + val start = serviceTask.submittedAt // Set the simulationOffset based on the starting time of the first task if (simulationOffset == Long.MIN_VALUE) { @@ -116,42 +118,24 @@ public suspend fun ComputeService.replay( // Delay the task based on the startTime given by the trace. if (!submitImmediately) { delay(max(0, (start - now - simulationOffset))) - entry.deadline -= simulationOffset + serviceTask.deadline -= simulationOffset } - val workload = entry.trace - val meta = mutableMapOf("workload" to workload) +// if (counter % 100000 == 0) { +// val endTimer = System.currentTimeMillis() +// +// println("Submitted $counter / $numTasks") +// println("Finished ${String.format("%.2f", (counter.toDouble() / numTasks) * 100)}% of task submissions") +// println("Simulation has been running for: ${(endTimer - startTimer) / 1000} s") +// println("Simulation time is time: ${now / 1000 / 60 / 60} hours\n") +// } - val nature = TaskNature(entry.deferrable) - - val flavorMeta = mutableMapOf() - - if (entry.cpuCapacity > 0.0) { - flavorMeta["cpu-capacity"] = entry.cpuCapacity - } - if (entry.gpuCapacity > 0.0) { - flavorMeta["gpu-capacity"] = entry.gpuCapacity - } +// counter++ launch { val task = client.newTask( - entry.id, - entry.name, - nature, - Duration.ofMillis(entry.duration), - entry.deadline, - client.newFlavor( - entry.id, - entry.cpuCount, - entry.memCapacity, - entry.gpuCount, - entry.parents, - entry.children, - flavorMeta, - ), - workload, - meta, + serviceTask, ) val taskWatcher = RunningTaskWatcher() @@ -162,6 +146,10 @@ public suspend fun ComputeService.replay( taskWatcher.wait() } } + +// println("All tasks submitted, waiting for completion...") +// val endTimer = System.currentTimeMillis() +// println("Simulation has been running for: ${(endTimer - startTimer) / 1000} s") } yield() } finally { diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index ffa31f57..a29a1dd5 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -103,7 +103,7 @@ public fun runScenario( ) val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction) - val startTimeLong = workload.minOf { it.submissionTime } + val startTimeLong = workload.minOf { it.submittedAt } val startTime = Duration.ofMillis(startTimeLong) val topology = clusterTopology(scenario.topologySpec.pathToFile) -- cgit v1.2.3