diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-07-16 16:56:28 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-16 16:56:28 +0200 |
| commit | 0c0cf25616771cd40a9e401edcba4a5e5016f76e (patch) | |
| tree | 90fa673939a6c4c53900a6aa6eef073ad2957e34 /opendc-compute/opendc-compute-simulator/src | |
| parent | 089c449762950b4322c04f73ef7fe0e10af615df (diff) | |
Added Workflows (#359)
* Implemented Workflows for OpenDC
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src')
2 files changed, 126 insertions, 8 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index 835c7186..8b6bef2c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -119,6 +119,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { */ private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>(); + private final List<SchedulingRequest> blockedTasks = new ArrayList<>(); + /** * The active tasks in the system. */ @@ -126,9 +128,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { /** * The active tasks in the system. - * TODO: this is not doing anything, maybe delete it? */ - private final Map<ServiceTask, SimHost> completedTasks = new HashMap<>(); + private final List<String> completedTasks = new ArrayList<>(); + + private final List<String> terminatedTasks = new ArrayList<>(); /** * The registered flavors for this compute service. @@ -209,9 +212,11 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { if (newState == TaskState.COMPLETED) { tasksCompleted++; + addCompletedTask(task); } if (newState == TaskState.TERMINATED) { tasksTerminated++; + addTerminatedTask(task); } if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) { @@ -430,17 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { long now = clock.millis(); SchedulingRequest request = new SchedulingRequest(task, now); - if (atFront) { - taskQueue.addFirst(request); - } else { - taskQueue.add(request); + ServiceFlavor flavor = task.getFlavor(); + for (String taskName : this.terminatedTasks) { + if (flavor.isInDependencies(taskName)) { + // Terminate task + task.setState(TaskState.TERMINATED); + } } + + // Remove all completed tasks from the pending dependencies + flavor.updatePendingDependencies(this.completedTasks); + + // If there are still pending dependencies, we cannot schedule the task yet + Set<String> pendingDependencies = flavor.getDependencies(); + if (!pendingDependencies.isEmpty()) { + // If the task has pending dependencies, we cannot schedule it yet + LOGGER.debug("Task {} has pending dependencies: {}", task.getUid(), pendingDependencies); + blockedTasks.add(request); + return null; + } + + // Add the request at the front or the back of the queue + if (atFront) taskQueue.addFirst(request); + else taskQueue.add(request); + tasksPending++; requestSchedulingCycle(); return request; } + void addCompletedTask(ServiceTask task) { + String taskName = task.getName(); + + if (!this.completedTasks.contains(taskName)) { + this.completedTasks.add(taskName); + } + + List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + + for (SchedulingRequest request : blockedTasks) { + request.getTask().getFlavor().updatePendingDependencies(taskName); + + Set<String> pendingDependencies = request.getTask().getFlavor().getDependencies(); + + if (pendingDependencies.isEmpty()) { + requestsToRemove.add(request); + taskQueue.add(request); + tasksPending++; + } + } + + for (SchedulingRequest request : requestsToRemove) { + blockedTasks.remove(request); + } + } + + void addTerminatedTask(ServiceTask task) { + String taskName = task.getName(); + + List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + + if (!this.terminatedTasks.contains(taskName)) { + this.terminatedTasks.add(taskName); + } + + for (SchedulingRequest request : blockedTasks) { + if (request.getTask().getFlavor().isInDependencies(taskName)) { + requestsToRemove.add(request); + request.getTask().setState(TaskState.TERMINATED); + } + } + + for (SchedulingRequest request : requestsToRemove) { + blockedTasks.remove(request); + } + } + void delete(ServiceFlavor flavor) { flavorById.remove(flavor.getUid()); flavors.remove(flavor); @@ -612,12 +683,19 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { @NotNull public ServiceFlavor newFlavor( - @NotNull String name, int cpuCount, long memorySize, int gpuCoreCount, @NotNull Map<String, ?> meta) { + @NotNull String name, + int cpuCount, + long memorySize, + int gpuCoreCount, + @NotNull Set<String> parents, + @NotNull Set<String> children, + @NotNull Map<String, ?> meta) { checkOpen(); final ComputeService service = this.service; UUID uid = new UUID(service.clock.millis(), service.random.nextLong()); - ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, meta); + ServiceFlavor flavor = + new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, parents, children, meta); // service.flavorById.put(uid, flavor); // service.flavors.add(flavor); diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java index 8a4359b4..bb68d336 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -23,8 +23,11 @@ package org.opendc.compute.simulator.service; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; import org.jetbrains.annotations.NotNull; import org.opendc.compute.api.Flavor; @@ -39,6 +42,9 @@ public final class ServiceFlavor implements Flavor { private final int cpuCoreCount; private final long memorySize; private final int gpuCoreCount; + private final Set<String> parents; + private final Set<String> children; + private final Set<String> dependencies; private final Map<String, ?> meta; ServiceFlavor( @@ -48,6 +54,8 @@ public final class ServiceFlavor implements Flavor { int cpuCoreCount, long memorySize, int gpuCoreCount, + Set<String> parents, + Set<String> children, Map<String, ?> meta) { this.service = service; this.uid = uid; @@ -55,6 +63,9 @@ public final class ServiceFlavor implements Flavor { this.cpuCoreCount = cpuCoreCount; this.memorySize = memorySize; this.gpuCoreCount = gpuCoreCount; + this.parents = parents; + this.dependencies = new HashSet<>(parents); + this.children = children; this.meta = meta; } @@ -118,4 +129,33 @@ public final class ServiceFlavor implements Flavor { public String toString() { return "Flavor[uid=" + uid + ",name=" + name + "]"; } + + @Override + public @NotNull Set<String> getDependencies() { + return dependencies; + } + + public void updatePendingDependencies(List<String> completedTasks) { + for (String task : completedTasks) { + this.updatePendingDependencies(task); + } + } + + public void updatePendingDependencies(String completedTask) { + this.dependencies.remove(completedTask); + } + + public boolean isInDependencies(String task) { + return this.dependencies.contains(task); + } + + @Override + public @NotNull Set<@NotNull String> getParents() { + return parents; + } + + @Override + public @NotNull Set<@NotNull String> getChildren() { + return children; + } } |
