From 0c0cf25616771cd40a9e401edcba4a5e5016f76e Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Wed, 16 Jul 2025 16:56:28 +0200 Subject: Added Workflows (#359) * Implemented Workflows for OpenDC --- .../main/kotlin/org/opendc/compute/api/Flavor.kt | 15 ++++ .../compute/simulator/service/ComputeService.java | 94 ++++++++++++++++++++-- .../compute/simulator/service/ServiceFlavor.java | 40 +++++++++ .../compute/workload/ComputeWorkloadLoader.kt | 18 +++-- .../kotlin/org/opendc/compute/workload/Task.kt | 8 +- .../org/opendc/compute/workload/WorkloadLoader.kt | 4 +- 6 files changed, 161 insertions(+), 18 deletions(-) (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index a54a0130..a15191c6 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -41,4 +41,19 @@ public interface Flavor : Resource { * The amount of gpu cores available to the task. */ public val gpuCoreCount: Int + + /** + * Set of Tasks that need to be finished before this can startAdd commentMore actions + */ + public val dependencies: Set + + /** + * Set of Tasks that need to be finished before this can startAdd commentMore actions + */ + public val parents: Set + + /** + * Set of Tasks that need to be finished before this can startAdd commentMore actions + */ + public val children: Set } diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index 835c7186..8b6bef2c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -119,6 +119,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { */ private final Deque taskQueue = new ArrayDeque<>(); + private final List blockedTasks = new ArrayList<>(); + /** * The active tasks in the system. */ @@ -126,9 +128,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { /** * The active tasks in the system. - * TODO: this is not doing anything, maybe delete it? */ - private final Map completedTasks = new HashMap<>(); + private final List completedTasks = new ArrayList<>(); + + private final List terminatedTasks = new ArrayList<>(); /** * The registered flavors for this compute service. @@ -209,9 +212,11 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { if (newState == TaskState.COMPLETED) { tasksCompleted++; + addCompletedTask(task); } if (newState == TaskState.TERMINATED) { tasksTerminated++; + addTerminatedTask(task); } if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) { @@ -430,17 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { long now = clock.millis(); SchedulingRequest request = new SchedulingRequest(task, now); - if (atFront) { - taskQueue.addFirst(request); - } else { - taskQueue.add(request); + ServiceFlavor flavor = task.getFlavor(); + for (String taskName : this.terminatedTasks) { + if (flavor.isInDependencies(taskName)) { + // Terminate task + task.setState(TaskState.TERMINATED); + } } + + // Remove all completed tasks from the pending dependencies + flavor.updatePendingDependencies(this.completedTasks); + + // If there are still pending dependencies, we cannot schedule the task yet + Set pendingDependencies = flavor.getDependencies(); + if (!pendingDependencies.isEmpty()) { + // If the task has pending dependencies, we cannot schedule it yet + LOGGER.debug("Task {} has pending dependencies: {}", task.getUid(), pendingDependencies); + blockedTasks.add(request); + return null; + } + + // Add the request at the front or the back of the queue + if (atFront) taskQueue.addFirst(request); + else taskQueue.add(request); + tasksPending++; requestSchedulingCycle(); return request; } + void addCompletedTask(ServiceTask task) { + String taskName = task.getName(); + + if (!this.completedTasks.contains(taskName)) { + this.completedTasks.add(taskName); + } + + List requestsToRemove = new ArrayList<>(); + + for (SchedulingRequest request : blockedTasks) { + request.getTask().getFlavor().updatePendingDependencies(taskName); + + Set pendingDependencies = request.getTask().getFlavor().getDependencies(); + + if (pendingDependencies.isEmpty()) { + requestsToRemove.add(request); + taskQueue.add(request); + tasksPending++; + } + } + + for (SchedulingRequest request : requestsToRemove) { + blockedTasks.remove(request); + } + } + + void addTerminatedTask(ServiceTask task) { + String taskName = task.getName(); + + List requestsToRemove = new ArrayList<>(); + + if (!this.terminatedTasks.contains(taskName)) { + this.terminatedTasks.add(taskName); + } + + for (SchedulingRequest request : blockedTasks) { + if (request.getTask().getFlavor().isInDependencies(taskName)) { + requestsToRemove.add(request); + request.getTask().setState(TaskState.TERMINATED); + } + } + + for (SchedulingRequest request : requestsToRemove) { + blockedTasks.remove(request); + } + } + void delete(ServiceFlavor flavor) { flavorById.remove(flavor.getUid()); flavors.remove(flavor); @@ -612,12 +683,19 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { @NotNull public ServiceFlavor newFlavor( - @NotNull String name, int cpuCount, long memorySize, int gpuCoreCount, @NotNull Map meta) { + @NotNull String name, + int cpuCount, + long memorySize, + int gpuCoreCount, + @NotNull Set parents, + @NotNull Set children, + @NotNull Map meta) { checkOpen(); final ComputeService service = this.service; UUID uid = new UUID(service.clock.millis(), service.random.nextLong()); - ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, meta); + ServiceFlavor flavor = + new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, parents, children, meta); // service.flavorById.put(uid, flavor); // service.flavors.add(flavor); diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java index 8a4359b4..bb68d336 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -23,8 +23,11 @@ package org.opendc.compute.simulator.service; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; import org.jetbrains.annotations.NotNull; import org.opendc.compute.api.Flavor; @@ -39,6 +42,9 @@ public final class ServiceFlavor implements Flavor { private final int cpuCoreCount; private final long memorySize; private final int gpuCoreCount; + private final Set parents; + private final Set children; + private final Set dependencies; private final Map meta; ServiceFlavor( @@ -48,6 +54,8 @@ public final class ServiceFlavor implements Flavor { int cpuCoreCount, long memorySize, int gpuCoreCount, + Set parents, + Set children, Map meta) { this.service = service; this.uid = uid; @@ -55,6 +63,9 @@ public final class ServiceFlavor implements Flavor { this.cpuCoreCount = cpuCoreCount; this.memorySize = memorySize; this.gpuCoreCount = gpuCoreCount; + this.parents = parents; + this.dependencies = new HashSet<>(parents); + this.children = children; this.meta = meta; } @@ -118,4 +129,33 @@ public final class ServiceFlavor implements Flavor { public String toString() { return "Flavor[uid=" + uid + ",name=" + name + "]"; } + + @Override + public @NotNull Set getDependencies() { + return dependencies; + } + + public void updatePendingDependencies(List completedTasks) { + for (String task : completedTasks) { + this.updatePendingDependencies(task); + } + } + + public void updatePendingDependencies(String completedTask) { + this.dependencies.remove(completedTask); + } + + public boolean isInDependencies(String task) { + return this.dependencies.contains(task); + } + + @Override + public @NotNull Set<@NotNull String> getParents() { + return parents; + } + + @Override + public @NotNull Set<@NotNull String> getChildren() { + return children; + } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 7599d4e1..3a0ee3e0 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -29,16 +29,17 @@ import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy import org.opendc.trace.Trace import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceGpuCapacity import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceGpuMemCapacity import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceStateCpuUsage import org.opendc.trace.conv.resourceStateDuration import org.opendc.trace.conv.resourceStateGpuUsage @@ -136,7 +137,8 @@ public class ComputeWorkloadLoader( val memCol = reader.resolve(resourceMemCapacity) val gpuCapacityCol = reader.resolve(resourceGpuCapacity) // Assuming GPU capacity is also present val gpuCoreCountCol = reader.resolve(resourceGpuCount) // Assuming GPU cores are also present - val gpuMemoryCol = reader.resolve(resourceGpuMemCapacity) // Assuming GPU memory is also present + val parentsCol = reader.resolve(resourceParents) + val childrenCol = reader.resolve(resourceChildren) val natureCol = reader.resolve(resourceNature) val deadlineCol = reader.resolve(resourceDeadline) @@ -166,6 +168,10 @@ public class ComputeWorkloadLoader( } val gpuCoreCount = reader.getInt(gpuCoreCountCol) // Default to 0 if not present val gpuMemory = 0L // currently not implemented + + val parents = reader.getSet(parentsCol, String::class.java) // No dependencies in the trace + val children = reader.getSet(childrenCol, String::class.java) // No dependencies in the trace + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) var nature = reader.getString(natureCol) var deadline = reader.getLong(deadlineCol) @@ -181,15 +187,17 @@ public class ComputeWorkloadLoader( Task( uid, id, + submissionTime, + duration, + parents!!, + children!!, cpuCount, cpuCapacity, + totalLoad, memCapacity.roundToLong(), gpuCoreCount, gpuUsage, gpuMemory, - totalLoad, - submissionTime, - duration, nature, deadline, builder.build(), diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index 228b84ed..b1ba4545 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -40,15 +40,17 @@ import java.util.UUID public data class Task( val uid: UUID, val name: String, + var submissionTime: Long, + val duration: Long, + val parents: Set = emptySet(), + val children: Set = emptySet(), val cpuCount: Int, val cpuCapacity: Double, + val totalCpuLoad: Double, val memCapacity: Long, val gpuCount: Int = 0, val gpuCapacity: Double = 0.0, val gpuMemCapacity: Long = 0L, - val totalLoad: Double, - var submissionTime: Long, - val duration: Long, val nature: String?, var deadline: Long, val trace: TraceWorkload, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt index fad4c512..c8b7ecc7 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt @@ -64,7 +64,7 @@ public abstract class WorkloadLoader(private val submissionTime: String? = null) val res = mutableListOf() - val totalLoad = workload.sumOf { it.totalLoad } + val totalLoad = workload.sumOf { it.totalCpuLoad } val desiredLoad = totalLoad * fraction var currentLoad = 0.0 @@ -72,7 +72,7 @@ public abstract class WorkloadLoader(private val submissionTime: String? = null) val entry = workload.random() res += entry - currentLoad += entry.totalLoad + currentLoad += entry.totalCpuLoad } logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } -- cgit v1.2.3