From 13a3f376fec17d5dcb60b635414c64a6d6ea3b13 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 16 Sep 2025 18:41:42 +0200 Subject: updated workflow implementation for performance (#368) * Updated the workflow system for performance. Added workflow specific tests. --- .../compute/simulator/service/ComputeService.java | 112 ++++++++++++--------- .../compute/simulator/service/ServiceFlavor.java | 18 +--- 2 files changed, 72 insertions(+), 58 deletions(-) (limited to 'opendc-compute/opendc-compute-simulator') diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index 8feddf54..557fb760 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -111,7 +111,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { */ private final Deque taskQueue = new ArrayDeque<>(); - private final List blockedTasks = new ArrayList<>(); + private final Map blockedTasks = new HashMap<>(); /** * The active tasks in the system. @@ -418,22 +418,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { SchedulingRequest request = new SchedulingRequest(task, now); ServiceFlavor flavor = task.getFlavor(); - for (int taskId : this.terminatedTasks) { - if (flavor.isInDependencies(taskId)) { - // Terminate task - task.setState(TaskState.TERMINATED); - } - } - - // Remove all completed tasks from the pending dependencies - flavor.updatePendingDependencies(this.completedTasks); - // If there are still pending dependencies, we cannot schedule the task yet - Set pendingDependencies = flavor.getDependencies(); - if (!pendingDependencies.isEmpty()) { - // If the task has pending dependencies, we cannot schedule it yet - LOGGER.debug("Task {} has pending dependencies: {}", task.getId(), pendingDependencies); - blockedTasks.add(request); + // If the task has parents, put in blocked tasks + if (!flavor.getParents().isEmpty()) { + blockedTasks.put(task.getId(), request); return null; } @@ -447,51 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { return request; } - void addCompletedTask(ServiceTask task) { - int taskId = task.getId(); + void addCompletedTask(ServiceTask completedTask) { + int parentId = completedTask.getId(); + // int taskId = task.getId(); - if (!this.completedTasks.contains(taskId)) { - this.completedTasks.add(taskId); - } - - List requestsToRemove = new ArrayList<>(); + // if (!this.completedTasks.contains(taskId)) { + // this.completedTasks.add(taskId); + // } - for (SchedulingRequest request : blockedTasks) { - request.getTask().getFlavor().updatePendingDependencies(taskId); + for (int taskId : completedTask.getFlavor().getChildren()) { + SchedulingRequest request = blockedTasks.get(taskId); + if (request != null) { + request.getTask().getFlavor().removeFromParents(parentId); - Set pendingDependencies = request.getTask().getFlavor().getDependencies(); + Set pendingDependencies = request.getTask().getFlavor().getParents(); - if (pendingDependencies.isEmpty()) { - requestsToRemove.add(request); - taskQueue.add(request); - tasksPending++; + if (pendingDependencies.isEmpty()) { + taskQueue.add(request); + tasksPending++; + blockedTasks.remove(taskId); + } } } - for (SchedulingRequest request : requestsToRemove) { - blockedTasks.remove(request); - } + // List requestsToRemove = new ArrayList<>(); + // + // for (SchedulingRequest request : blockedTasks) { + // request.getTask().getFlavor().updatePendingDependencies(taskId); + // + // Set pendingDependencies = request.getTask().getFlavor().getDependencies(); + // + // if (pendingDependencies.isEmpty()) { + // requestsToRemove.add(request); + // taskQueue.add(request); + // tasksPending++; + // } + // } + // + // for (SchedulingRequest request : requestsToRemove) { + // blockedTasks.remove(request); + // } } void addTerminatedTask(ServiceTask task) { - int taskId = task.getId(); - List requestsToRemove = new ArrayList<>(); + for (int taskId : task.getFlavor().getChildren()) { + SchedulingRequest request = blockedTasks.get(taskId); + if (request != null) { + ServiceTask childTask = request.getTask(); - if (!this.terminatedTasks.contains(taskId)) { - this.terminatedTasks.add(taskId); - } + childTask.setState(TaskState.TERMINATED); + + this.addTerminatedTask(childTask); - for (SchedulingRequest request : blockedTasks) { - if (request.getTask().getFlavor().isInDependencies(taskId)) { - requestsToRemove.add(request); - request.getTask().setState(TaskState.TERMINATED); + this.setTaskToBeRemoved(childTask); + + blockedTasks.remove(childTask.getId()); } } - for (SchedulingRequest request : requestsToRemove) { - blockedTasks.remove(request); - } + // int taskId = task.getId(); + // + // List requestsToRemove = new ArrayList<>(); + // + // if (!this.terminatedTasks.contains(taskId)) { + // this.terminatedTasks.add(taskId); + // } + // + // for (SchedulingRequest request : blockedTasks) { + // if (request.getTask().getFlavor().isInDependencies(taskId)) { + // requestsToRemove.add(request); + // request.getTask().setState(TaskState.TERMINATED); + // } + // } + // + // for (SchedulingRequest request : requestsToRemove) { + // blockedTasks.remove(request); + // } } void delete(ServiceFlavor flavor) { @@ -554,6 +574,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { task.setState(TaskState.TERMINATED); + this.addTerminatedTask(task); + this.setTaskToBeRemoved(task); continue; } else { diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java index 6201f21f..2d6f0342 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -23,7 +23,6 @@ package org.opendc.compute.simulator.service; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,7 +41,6 @@ public final class ServiceFlavor implements Flavor { private final int gpuCoreCount; private final Set parents; private final Set children; - private final Set dependencies; private final Map meta; ServiceFlavor( @@ -60,7 +58,6 @@ public final class ServiceFlavor implements Flavor { this.memorySize = memorySize; this.gpuCoreCount = gpuCoreCount; this.parents = parents; - this.dependencies = new HashSet<>(parents); this.children = children; this.meta = meta; } @@ -119,23 +116,18 @@ public final class ServiceFlavor implements Flavor { return "Flavor[name=" + taskId + "]"; } - @Override - public @NotNull Set getDependencies() { - return dependencies; - } - - public void updatePendingDependencies(List completedTasks) { + public void removeFromParents(List completedTasks) { for (int task : completedTasks) { - this.updatePendingDependencies(task); + this.removeFromParents(task); } } - public void updatePendingDependencies(int completedTask) { - this.dependencies.remove(completedTask); + public void removeFromParents(int completedTask) { + this.parents.remove(completedTask); } public boolean isInDependencies(int task) { - return this.dependencies.contains(task); + return this.parents.contains(task); } @Override -- cgit v1.2.3