summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt15
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java94
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java40
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt4
6 files changed, 161 insertions, 18 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
index a54a0130..a15191c6 100644
--- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
+++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
@@ -41,4 +41,19 @@ public interface Flavor : Resource {
* The amount of gpu cores available to the task.
*/
public val gpuCoreCount: Int
+
+ /**
+ * Set of Tasks that need to be finished before this can startAdd commentMore actions
+ */
+ public val dependencies: Set<String>
+
+ /**
+ * Set of Tasks that need to be finished before this can startAdd commentMore actions
+ */
+ public val parents: Set<String>
+
+ /**
+ * Set of Tasks that need to be finished before this can startAdd commentMore actions
+ */
+ public val children: Set<String>
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
index 835c7186..8b6bef2c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
@@ -119,6 +119,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
+ private final List<SchedulingRequest> blockedTasks = new ArrayList<>();
+
/**
* The active tasks in the system.
*/
@@ -126,9 +128,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
/**
* The active tasks in the system.
- * TODO: this is not doing anything, maybe delete it?
*/
- private final Map<ServiceTask, SimHost> completedTasks = new HashMap<>();
+ private final List<String> completedTasks = new ArrayList<>();
+
+ private final List<String> terminatedTasks = new ArrayList<>();
/**
* The registered flavors for this compute service.
@@ -209,9 +212,11 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
if (newState == TaskState.COMPLETED) {
tasksCompleted++;
+ addCompletedTask(task);
}
if (newState == TaskState.TERMINATED) {
tasksTerminated++;
+ addTerminatedTask(task);
}
if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
@@ -430,17 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
long now = clock.millis();
SchedulingRequest request = new SchedulingRequest(task, now);
- if (atFront) {
- taskQueue.addFirst(request);
- } else {
- taskQueue.add(request);
+ ServiceFlavor flavor = task.getFlavor();
+ for (String taskName : this.terminatedTasks) {
+ if (flavor.isInDependencies(taskName)) {
+ // Terminate task
+ task.setState(TaskState.TERMINATED);
+ }
}
+
+ // Remove all completed tasks from the pending dependencies
+ flavor.updatePendingDependencies(this.completedTasks);
+
+ // If there are still pending dependencies, we cannot schedule the task yet
+ Set<String> pendingDependencies = flavor.getDependencies();
+ if (!pendingDependencies.isEmpty()) {
+ // If the task has pending dependencies, we cannot schedule it yet
+ LOGGER.debug("Task {} has pending dependencies: {}", task.getUid(), pendingDependencies);
+ blockedTasks.add(request);
+ return null;
+ }
+
+ // Add the request at the front or the back of the queue
+ if (atFront) taskQueue.addFirst(request);
+ else taskQueue.add(request);
+
tasksPending++;
requestSchedulingCycle();
return request;
}
+ void addCompletedTask(ServiceTask task) {
+ String taskName = task.getName();
+
+ if (!this.completedTasks.contains(taskName)) {
+ this.completedTasks.add(taskName);
+ }
+
+ List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+
+ for (SchedulingRequest request : blockedTasks) {
+ request.getTask().getFlavor().updatePendingDependencies(taskName);
+
+ Set<String> pendingDependencies = request.getTask().getFlavor().getDependencies();
+
+ if (pendingDependencies.isEmpty()) {
+ requestsToRemove.add(request);
+ taskQueue.add(request);
+ tasksPending++;
+ }
+ }
+
+ for (SchedulingRequest request : requestsToRemove) {
+ blockedTasks.remove(request);
+ }
+ }
+
+ void addTerminatedTask(ServiceTask task) {
+ String taskName = task.getName();
+
+ List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+
+ if (!this.terminatedTasks.contains(taskName)) {
+ this.terminatedTasks.add(taskName);
+ }
+
+ for (SchedulingRequest request : blockedTasks) {
+ if (request.getTask().getFlavor().isInDependencies(taskName)) {
+ requestsToRemove.add(request);
+ request.getTask().setState(TaskState.TERMINATED);
+ }
+ }
+
+ for (SchedulingRequest request : requestsToRemove) {
+ blockedTasks.remove(request);
+ }
+ }
+
void delete(ServiceFlavor flavor) {
flavorById.remove(flavor.getUid());
flavors.remove(flavor);
@@ -612,12 +683,19 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
@NotNull
public ServiceFlavor newFlavor(
- @NotNull String name, int cpuCount, long memorySize, int gpuCoreCount, @NotNull Map<String, ?> meta) {
+ @NotNull String name,
+ int cpuCount,
+ long memorySize,
+ int gpuCoreCount,
+ @NotNull Set<String> parents,
+ @NotNull Set<String> children,
+ @NotNull Map<String, ?> meta) {
checkOpen();
final ComputeService service = this.service;
UUID uid = new UUID(service.clock.millis(), service.random.nextLong());
- ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, meta);
+ ServiceFlavor flavor =
+ new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, parents, children, meta);
// service.flavorById.put(uid, flavor);
// service.flavors.add(flavor);
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
index 8a4359b4..bb68d336 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
@@ -23,8 +23,11 @@
package org.opendc.compute.simulator.service;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import org.jetbrains.annotations.NotNull;
import org.opendc.compute.api.Flavor;
@@ -39,6 +42,9 @@ public final class ServiceFlavor implements Flavor {
private final int cpuCoreCount;
private final long memorySize;
private final int gpuCoreCount;
+ private final Set<String> parents;
+ private final Set<String> children;
+ private final Set<String> dependencies;
private final Map<String, ?> meta;
ServiceFlavor(
@@ -48,6 +54,8 @@ public final class ServiceFlavor implements Flavor {
int cpuCoreCount,
long memorySize,
int gpuCoreCount,
+ Set<String> parents,
+ Set<String> children,
Map<String, ?> meta) {
this.service = service;
this.uid = uid;
@@ -55,6 +63,9 @@ public final class ServiceFlavor implements Flavor {
this.cpuCoreCount = cpuCoreCount;
this.memorySize = memorySize;
this.gpuCoreCount = gpuCoreCount;
+ this.parents = parents;
+ this.dependencies = new HashSet<>(parents);
+ this.children = children;
this.meta = meta;
}
@@ -118,4 +129,33 @@ public final class ServiceFlavor implements Flavor {
public String toString() {
return "Flavor[uid=" + uid + ",name=" + name + "]";
}
+
+ @Override
+ public @NotNull Set<String> getDependencies() {
+ return dependencies;
+ }
+
+ public void updatePendingDependencies(List<String> completedTasks) {
+ for (String task : completedTasks) {
+ this.updatePendingDependencies(task);
+ }
+ }
+
+ public void updatePendingDependencies(String completedTask) {
+ this.dependencies.remove(completedTask);
+ }
+
+ public boolean isInDependencies(String task) {
+ return this.dependencies.contains(task);
+ }
+
+ @Override
+ public @NotNull Set<@NotNull String> getParents() {
+ return parents;
+ }
+
+ @Override
+ public @NotNull Set<@NotNull String> getChildren() {
+ return children;
+ }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 7599d4e1..3a0ee3e0 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -29,16 +29,17 @@ import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy
import org.opendc.trace.Trace
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceChildren
import org.opendc.trace.conv.resourceCpuCapacity
import org.opendc.trace.conv.resourceCpuCount
import org.opendc.trace.conv.resourceDeadline
import org.opendc.trace.conv.resourceDuration
import org.opendc.trace.conv.resourceGpuCapacity
import org.opendc.trace.conv.resourceGpuCount
-import org.opendc.trace.conv.resourceGpuMemCapacity
import org.opendc.trace.conv.resourceID
import org.opendc.trace.conv.resourceMemCapacity
import org.opendc.trace.conv.resourceNature
+import org.opendc.trace.conv.resourceParents
import org.opendc.trace.conv.resourceStateCpuUsage
import org.opendc.trace.conv.resourceStateDuration
import org.opendc.trace.conv.resourceStateGpuUsage
@@ -136,7 +137,8 @@ public class ComputeWorkloadLoader(
val memCol = reader.resolve(resourceMemCapacity)
val gpuCapacityCol = reader.resolve(resourceGpuCapacity) // Assuming GPU capacity is also present
val gpuCoreCountCol = reader.resolve(resourceGpuCount) // Assuming GPU cores are also present
- val gpuMemoryCol = reader.resolve(resourceGpuMemCapacity) // Assuming GPU memory is also present
+ val parentsCol = reader.resolve(resourceParents)
+ val childrenCol = reader.resolve(resourceChildren)
val natureCol = reader.resolve(resourceNature)
val deadlineCol = reader.resolve(resourceDeadline)
@@ -166,6 +168,10 @@ public class ComputeWorkloadLoader(
}
val gpuCoreCount = reader.getInt(gpuCoreCountCol) // Default to 0 if not present
val gpuMemory = 0L // currently not implemented
+
+ val parents = reader.getSet(parentsCol, String::class.java) // No dependencies in the trace
+ val children = reader.getSet(childrenCol, String::class.java) // No dependencies in the trace
+
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
var nature = reader.getString(natureCol)
var deadline = reader.getLong(deadlineCol)
@@ -181,15 +187,17 @@ public class ComputeWorkloadLoader(
Task(
uid,
id,
+ submissionTime,
+ duration,
+ parents!!,
+ children!!,
cpuCount,
cpuCapacity,
+ totalLoad,
memCapacity.roundToLong(),
gpuCoreCount,
gpuUsage,
gpuMemory,
- totalLoad,
- submissionTime,
- duration,
nature,
deadline,
builder.build(),
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index 228b84ed..b1ba4545 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -40,15 +40,17 @@ import java.util.UUID
public data class Task(
val uid: UUID,
val name: String,
+ var submissionTime: Long,
+ val duration: Long,
+ val parents: Set<String> = emptySet(),
+ val children: Set<String> = emptySet(),
val cpuCount: Int,
val cpuCapacity: Double,
+ val totalCpuLoad: Double,
val memCapacity: Long,
val gpuCount: Int = 0,
val gpuCapacity: Double = 0.0,
val gpuMemCapacity: Long = 0L,
- val totalLoad: Double,
- var submissionTime: Long,
- val duration: Long,
val nature: String?,
var deadline: Long,
val trace: TraceWorkload,
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
index fad4c512..c8b7ecc7 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt
@@ -64,7 +64,7 @@ public abstract class WorkloadLoader(private val submissionTime: String? = null)
val res = mutableListOf<Task>()
- val totalLoad = workload.sumOf { it.totalLoad }
+ val totalLoad = workload.sumOf { it.totalCpuLoad }
val desiredLoad = totalLoad * fraction
var currentLoad = 0.0
@@ -72,7 +72,7 @@ public abstract class WorkloadLoader(private val submissionTime: String? = null)
val entry = workload.random()
res += entry
- currentLoad += entry.totalLoad
+ currentLoad += entry.totalCpuLoad
}
logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }