summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt5
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java112
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentCliTest.kt (renamed from opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt)10
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt (renamed from opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt)42
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt6
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt561
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt2
10 files changed, 666 insertions, 94 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
index 581c29ba..48765a3b 100644
--- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
+++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
@@ -45,11 +45,6 @@ public interface Flavor : Resource {
/**
* Set of Tasks that need to be finished before this can startAdd commentMore actions
*/
- public val dependencies: Set<Int>
-
- /**
- * Set of Tasks that need to be finished before this can startAdd commentMore actions
- */
public val parents: Set<Int>
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
index 8feddf54..557fb760 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
@@ -111,7 +111,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
- private final List<SchedulingRequest> blockedTasks = new ArrayList<>();
+ private final Map<Integer, SchedulingRequest> blockedTasks = new HashMap<>();
/**
* The active tasks in the system.
@@ -418,22 +418,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
SchedulingRequest request = new SchedulingRequest(task, now);
ServiceFlavor flavor = task.getFlavor();
- for (int taskId : this.terminatedTasks) {
- if (flavor.isInDependencies(taskId)) {
- // Terminate task
- task.setState(TaskState.TERMINATED);
- }
- }
-
- // Remove all completed tasks from the pending dependencies
- flavor.updatePendingDependencies(this.completedTasks);
- // If there are still pending dependencies, we cannot schedule the task yet
- Set<Integer> pendingDependencies = flavor.getDependencies();
- if (!pendingDependencies.isEmpty()) {
- // If the task has pending dependencies, we cannot schedule it yet
- LOGGER.debug("Task {} has pending dependencies: {}", task.getId(), pendingDependencies);
- blockedTasks.add(request);
+ // If the task has parents, put in blocked tasks
+ if (!flavor.getParents().isEmpty()) {
+ blockedTasks.put(task.getId(), request);
return null;
}
@@ -447,51 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
return request;
}
- void addCompletedTask(ServiceTask task) {
- int taskId = task.getId();
+ void addCompletedTask(ServiceTask completedTask) {
+ int parentId = completedTask.getId();
+ // int taskId = task.getId();
- if (!this.completedTasks.contains(taskId)) {
- this.completedTasks.add(taskId);
- }
-
- List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+ // if (!this.completedTasks.contains(taskId)) {
+ // this.completedTasks.add(taskId);
+ // }
- for (SchedulingRequest request : blockedTasks) {
- request.getTask().getFlavor().updatePendingDependencies(taskId);
+ for (int taskId : completedTask.getFlavor().getChildren()) {
+ SchedulingRequest request = blockedTasks.get(taskId);
+ if (request != null) {
+ request.getTask().getFlavor().removeFromParents(parentId);
- Set<Integer> pendingDependencies = request.getTask().getFlavor().getDependencies();
+ Set<Integer> pendingDependencies = request.getTask().getFlavor().getParents();
- if (pendingDependencies.isEmpty()) {
- requestsToRemove.add(request);
- taskQueue.add(request);
- tasksPending++;
+ if (pendingDependencies.isEmpty()) {
+ taskQueue.add(request);
+ tasksPending++;
+ blockedTasks.remove(taskId);
+ }
}
}
- for (SchedulingRequest request : requestsToRemove) {
- blockedTasks.remove(request);
- }
+ // List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+ //
+ // for (SchedulingRequest request : blockedTasks) {
+ // request.getTask().getFlavor().updatePendingDependencies(taskId);
+ //
+ // Set<Integer> pendingDependencies = request.getTask().getFlavor().getDependencies();
+ //
+ // if (pendingDependencies.isEmpty()) {
+ // requestsToRemove.add(request);
+ // taskQueue.add(request);
+ // tasksPending++;
+ // }
+ // }
+ //
+ // for (SchedulingRequest request : requestsToRemove) {
+ // blockedTasks.remove(request);
+ // }
}
void addTerminatedTask(ServiceTask task) {
- int taskId = task.getId();
- List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+ for (int taskId : task.getFlavor().getChildren()) {
+ SchedulingRequest request = blockedTasks.get(taskId);
+ if (request != null) {
+ ServiceTask childTask = request.getTask();
- if (!this.terminatedTasks.contains(taskId)) {
- this.terminatedTasks.add(taskId);
- }
+ childTask.setState(TaskState.TERMINATED);
+
+ this.addTerminatedTask(childTask);
- for (SchedulingRequest request : blockedTasks) {
- if (request.getTask().getFlavor().isInDependencies(taskId)) {
- requestsToRemove.add(request);
- request.getTask().setState(TaskState.TERMINATED);
+ this.setTaskToBeRemoved(childTask);
+
+ blockedTasks.remove(childTask.getId());
}
}
- for (SchedulingRequest request : requestsToRemove) {
- blockedTasks.remove(request);
- }
+ // int taskId = task.getId();
+ //
+ // List<SchedulingRequest> requestsToRemove = new ArrayList<>();
+ //
+ // if (!this.terminatedTasks.contains(taskId)) {
+ // this.terminatedTasks.add(taskId);
+ // }
+ //
+ // for (SchedulingRequest request : blockedTasks) {
+ // if (request.getTask().getFlavor().isInDependencies(taskId)) {
+ // requestsToRemove.add(request);
+ // request.getTask().setState(TaskState.TERMINATED);
+ // }
+ // }
+ //
+ // for (SchedulingRequest request : requestsToRemove) {
+ // blockedTasks.remove(request);
+ // }
}
void delete(ServiceFlavor flavor) {
@@ -554,6 +574,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
task.setState(TaskState.TERMINATED);
+ this.addTerminatedTask(task);
+
this.setTaskToBeRemoved(task);
continue;
} else {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
index 6201f21f..2d6f0342 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java
@@ -23,7 +23,6 @@
package org.opendc.compute.simulator.service;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,7 +41,6 @@ public final class ServiceFlavor implements Flavor {
private final int gpuCoreCount;
private final Set<Integer> parents;
private final Set<Integer> children;
- private final Set<Integer> dependencies;
private final Map<String, ?> meta;
ServiceFlavor(
@@ -60,7 +58,6 @@ public final class ServiceFlavor implements Flavor {
this.memorySize = memorySize;
this.gpuCoreCount = gpuCoreCount;
this.parents = parents;
- this.dependencies = new HashSet<>(parents);
this.children = children;
this.meta = meta;
}
@@ -119,23 +116,18 @@ public final class ServiceFlavor implements Flavor {
return "Flavor[name=" + taskId + "]";
}
- @Override
- public @NotNull Set<Integer> getDependencies() {
- return dependencies;
- }
-
- public void updatePendingDependencies(List<Integer> completedTasks) {
+ public void removeFromParents(List<Integer> completedTasks) {
for (int task : completedTasks) {
- this.updatePendingDependencies(task);
+ this.removeFromParents(task);
}
}
- public void updatePendingDependencies(int completedTask) {
- this.dependencies.remove(completedTask);
+ public void removeFromParents(int completedTask) {
+ this.parents.remove(completedTask);
}
public boolean isInDependencies(int task) {
- return this.dependencies.contains(task);
+ return this.parents.contains(task);
}
@Override
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index c875b8a2..a988d774 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -40,7 +40,7 @@ public data class Task(
val name: String,
var submissionTime: Long,
val duration: Long,
- val parents: Set<Int> = emptySet(),
+ val parents: Set<Int> = mutableSetOf(),
val children: Set<Int> = emptySet(),
val cpuCount: Int,
val cpuCapacity: Double,
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentCliTest.kt
index cd16f174..748a020c 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentCliTest.kt
@@ -29,15 +29,15 @@ import java.io.File
/**
* An integration test suite for the Experiment Runner.
*/
-class ExperimentRunnerTest {
+class ExperimentCliTest {
/**
- * ExperimentRunner test 1
+ * ExperimentCli test 1
* This test runs the experiment defined in the experiment_1.json file.
*
* In this test, the bitbrains-small workload is executed with and without a carbon trace.
*/
@Test
- fun testExperimentRunner1() {
+ fun testExperimentCli1() {
ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_1.json"))
val someDir = File("output")
@@ -45,13 +45,13 @@ class ExperimentRunnerTest {
}
/**
- * ExperimentRunner test 2
+ * ExperimentCli test 2
* This test runs the experiment defined in the experiment_2.json file.
*
* In this test, parts of the Marconi 100 workload is executed . This trace contains GPU tasks.
*/
@Test
- fun testExperimentRunner2() {
+ fun testExperimentCli2() {
ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_2.json"))
val someDir = File("output")
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt
index 28096bb8..f43dfbb0 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt
@@ -36,9 +36,9 @@ import java.util.ArrayList
/**
* An integration test suite for the Scenario experiments.
*/
-class ExperimentTest {
+class ScenarioRunnerTest {
/**
- * Simulator test 1: Single Task
+ * Scenario test 1: Single Task
* In this test, a single task is scheduled that takes 10 minutes to run.
*
* There should be no problems running the task, so the total runtime should be 10 min.
@@ -48,7 +48,7 @@ class ExperimentTest {
* When the task is failed, all time is idle.
*/
@Test
- fun testSimulator1() {
+ fun testScenario1() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -76,7 +76,7 @@ class ExperimentTest {
}
/**
- * Simulator test 1: Two Tasks
+ * Scenario test 1: Two Tasks
* In this test, two tasks are scheduled.
*
* There should be no problems running the task, so the total runtime should be 15 min.
@@ -85,7 +85,7 @@ class ExperimentTest {
* The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testSimulator2() {
+ fun testScenario2() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -129,7 +129,7 @@ class ExperimentTest {
}
/**
- * Simulator test 3: Two Tasks, one scheduled later
+ * Scenario test 3: Two Tasks, one scheduled later
* In this test, two tasks are scheduled.
*
* There should be no problems running the task, so the total runtime should be 15 min.
@@ -138,7 +138,7 @@ class ExperimentTest {
* The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testSimulator3() {
+ fun testScenario3() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -174,7 +174,7 @@ class ExperimentTest {
}
/**
- * Simulator test 4: Two Tasks, one scheduled later
+ * Scenario test 4: Two Tasks, one scheduled later
* In this test, two tasks are scheduled.
*
* There should be no problems running the task, so the total runtime should be 15 min.
@@ -183,7 +183,7 @@ class ExperimentTest {
* The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testSimulator4() {
+ fun testScenario4() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -224,12 +224,12 @@ class ExperimentTest {
}
/**
- * Simulator test 5: One Task purely running on GPU
+ * Scenario test 5: One Task purely running on GPU
*
* In this test, a single task is scheduled that takes 10 minutes to run. It solely uses the GPU.
*/
@Test
- fun testSimulator5() {
+ fun testScenario5() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -272,12 +272,12 @@ class ExperimentTest {
}
/**
- * Simulator test 6: One Task running on CPU & GPU
+ * Scenario test 6: One Task running on CPU & GPU
*
* In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used and have the same runtime.
*/
@Test
- fun testSimulator6() {
+ fun testScenario6() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -319,12 +319,12 @@ class ExperimentTest {
}
/**
- * Simulator test 7: One Task running on CPU & GPU
+ * Scenario test 7: One Task running on CPU & GPU
*
* In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used. CPU will finish way ahead of the GPU.
*/
@Test
- fun testSimulator7() {
+ fun testScenario7() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -365,12 +365,12 @@ class ExperimentTest {
}
/**
- * Simulator test 8: One Task running on CPU & GPU
+ * Scenario test 8: One Task running on CPU & GPU
*
* In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used. GPU will finish way ahead of the CPU.
*/
@Test
- fun testSimulator8() {
+ fun testScenario8() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -410,12 +410,12 @@ class ExperimentTest {
}
/**
- * Simulator test 9: Two tasks running on CPU & GPU
+ * Scenario test 9: Two tasks running on CPU & GPU
*
* In this test, two tasks are scheduled at the same time that takes 10 minutes to run. CPU & GPU are used. Both resources will finish at the same time.
*/
@Test
- fun testSimulator9() {
+ fun testScenario9() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
@@ -465,12 +465,12 @@ class ExperimentTest {
}
/**
- * Simulator test 10: Two tasks running on CPU & GPU
+ * Scenario test 10: Two tasks running on CPU & GPU
*
* In this test, two tasks are scheduled at the same time that takes 10 minutes to run. One task purely uses CPU, one purely GPU.
*/
@Test
- fun testSimulator10() {
+ fun testScenario10() {
val workload: ArrayList<Task> =
arrayListOf(
createTestTask(
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
index f34160f7..d5e4c925 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
@@ -75,6 +75,8 @@ fun createTestTask(
checkpointDuration: Long = 0L,
checkpointIntervalScaling: Double = 1.0,
scalingPolicy: ScalingPolicy = NoDelayScaling(),
+ parents: Set<Int> = emptySet(),
+ children: Set<Int> = emptySet(),
): Task {
var usedResources = arrayOf<ResourceType>()
if (fragments.any { it.cpuUsage > 0.0 }) {
@@ -89,8 +91,8 @@ fun createTestTask(
name,
LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli(),
duration,
- emptySet(),
- emptySet(),
+ parents,
+ children,
cpuCount,
fragments.maxOf { it.cpuUsage },
1800000.0,
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt
new file mode 100644
index 00000000..6e149d45
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt
@@ -0,0 +1,561 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.base
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.workload.Task
+import org.opendc.simulator.compute.workload.trace.TraceFragment
+import java.util.ArrayList
+
+/**
+ * An integration test suite for the Scenario experiments.
+ */
+class WorkflowTest {
+ /**
+ * Scenario test 1: simple workflow
+ * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs.
+ * The tasks are arranged in a diamond shape, where task 0 is the root task, tasks 1 and 2 are the children of task 0,
+ * and task 3 is the child of tasks 1 and 2.
+ *
+ * There should be no problems running the task, so the total runtime should be 30 min because 1 and 2 can be executed
+ * at the same time.
+ *
+ */
+ @Test
+ fun testWorkflow1() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ id = 0,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = emptySet(),
+ children = mutableSetOf(1, 2),
+ ),
+ createTestTask(
+ id = 1,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 2,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 3,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(1, 2),
+ children = emptySet(),
+ ),
+ )
+
+ val topology = createTopology("single_2_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(3 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((10 * 45000) + (10 * 30000) + (10 * 45000)).toLong(),
+ monitor.hostCpuIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((10 * 15000) + (10 * 30000) + (10 * 15000)).toLong(),
+ monitor.hostCpuActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ { assertEquals(7500.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect host energy usage at timestamp 0" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect host energy usage at timestamp 0" } },
+ { assertEquals(7500.0, monitor.hostEnergyUsages["H01"]?.get(20)) { "Incorrect host energy usage at timestamp 0" } },
+ {
+ assertEquals(
+ 600 * 125.0 + 600 * 150.0 + 600 * 125.0,
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect host energy usage" }
+ },
+ { assertEquals(600 * 125.0 + 600 * 150.0 + 600 * 125.0, monitor.energyUsages.sum()) { "Incorrect total energy usage" } },
+ )
+ }
+
+ /**
+ * Scenario test 2: simple workflow
+ * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs.
+ * The tasks are arranged in a diamond shape, where task 0 is the root task, tasks 1 and 2 are the children of task 0,
+ * and task 3 is the child of tasks 1 and 2. However, task 2 has a shorter duration than task 1.
+ *
+ * There should be no problems running the task, so the total runtime should still be 30 min because
+ * 3 can only be executed after 1 is finished.
+ *
+ */
+ @Test
+ fun testWorkflow2() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ id = 0,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = emptySet(),
+ children = mutableSetOf(1, 2),
+ ),
+ createTestTask(
+ id = 1,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 2,
+ fragments =
+ arrayListOf(
+ TraceFragment(5 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 3,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(1, 2),
+ children = emptySet(),
+ ),
+ )
+
+ val topology = createTopology("single_2_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(3 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((10 * 45000) + (5 * 30000) + (15 * 45000)).toLong(),
+ monitor.hostCpuIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((10 * 15000) + (5 * 30000) + (15 * 15000)).toLong(),
+ monitor.hostCpuActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ {
+ assertEquals(
+ 7500.0,
+ monitor.hostEnergyUsages["H01"]?.get(0),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(10),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 7500.0,
+ monitor.hostEnergyUsages["H01"]?.get(20),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 600 * 125.0 + 300 * 150.0 + 900 * 125.0,
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect host energy usage" }
+ },
+ {
+ assertEquals(
+ 600 * 125.0 + 300 * 150.0 + 900 * 125.0,
+ monitor.energyUsages.sum(),
+ ) { "Incorrect total energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Scenario test 3: simple workflow with unconnected extra task
+ * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs.
+ * However, there is also another task that is not connected to the workflow running.
+ *
+ * This means that the workflow cannot be parallelized as the extra task will take up one CPU for 40 minutes.
+ *
+ * The total runtime should therefore be 40 minutes.
+ */
+ @Test
+ fun testWorkflow3() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ id = 0,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = emptySet(),
+ children = mutableSetOf(1, 2),
+ ),
+ createTestTask(
+ id = 1,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 2,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 3,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(1, 2),
+ children = emptySet(),
+ ),
+ createTestTask(
+ id = 5,
+ fragments =
+ arrayListOf(
+ TraceFragment(40 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ ),
+ )
+
+ val topology = createTopology("single_2_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(4 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((40 * 30000)).toLong(),
+ monitor.hostCpuIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((40 * 30000)).toLong(),
+ monitor.hostCpuActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(0),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(10),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(20),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 2400 * 150.0,
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect host energy usage" }
+ },
+ {
+ assertEquals(
+ 2400 * 150.0,
+ monitor.energyUsages.sum(),
+ ) { "Incorrect total energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Scenario test 4: simple workflow with unconnected extra task
+ * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs.
+ * However, there is also another task that is not connected to the workflow running.
+ *
+ * This means that the workflow cannot be parallelized for the first 15 minutes as the extra task will take up one CPU.
+ * After that, the workflow can be parallelized as the extra task is finished.
+ *
+ * The total runtime should therefore be 35 minutes.
+ */
+ @Test
+ fun testWorkflow4() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ id = 0,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = emptySet(),
+ children = mutableSetOf(1, 2),
+ ),
+ createTestTask(
+ id = 1,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 2,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 3,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(1, 2),
+ children = emptySet(),
+ ),
+ createTestTask(
+ id = 5,
+ fragments =
+ arrayListOf(
+ TraceFragment(15 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ ),
+ )
+
+ val topology = createTopology("single_2_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(35 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((20 * 30000) + (15 * 45000)).toLong(),
+ monitor.hostCpuIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((20 * 30000) + (15 * 15000)).toLong(),
+ monitor.hostCpuActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(0),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 9000.0,
+ monitor.hostEnergyUsages["H01"]?.get(10),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 7500.0,
+ monitor.hostEnergyUsages["H01"]?.get(20),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 1200 * 150.0 + 900 * 125.0,
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect host energy usage" }
+ },
+ {
+ assertEquals(
+ 1200 * 150.0 + 900 * 125.0,
+ monitor.energyUsages.sum(),
+ ) { "Incorrect total energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Scenario test 3: workflow for which the first task cannot be scheduled.
+ * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs.
+ * However, the first task can not be scheduled on the host due to its high CPU requirement.
+ * The whole workflow is therefore terminated.
+ *
+ * The single unrelated task should still be executed
+ *
+ * The total runtime should therefore be 10 minutes.
+ */
+ @Test
+ fun testWorkflow5() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ id = 0,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 10000.0),
+ ),
+ cpuCount = 10,
+ parents = emptySet(),
+ children = mutableSetOf(1, 2),
+ ),
+ createTestTask(
+ id = 1,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 2,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(0),
+ children = mutableSetOf(3),
+ ),
+ createTestTask(
+ id = 3,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ parents = mutableSetOf(1, 2),
+ children = emptySet(),
+ ),
+ createTestTask(
+ id = 5,
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0),
+ ),
+ cpuCount = 1,
+ ),
+ )
+
+ val topology = createTopology("single_2_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((10 * 45000)).toLong(),
+ monitor.hostCpuIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((10 * 15000)).toLong(),
+ monitor.hostCpuActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ {
+ assertEquals(
+ 7500.0,
+ monitor.hostEnergyUsages["H01"]?.get(0),
+ ) { "Incorrect host energy usage at timestamp 0" }
+ },
+ {
+ assertEquals(
+ 600 * 125.0,
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect host energy usage" }
+ },
+ {
+ assertEquals(
+ 600 * 125.0,
+ monitor.energyUsages.sum(),
+ ) { "Incorrect total energy usage" }
+ },
+ )
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
index f661d5a9..0ebac5eb 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
@@ -37,7 +37,7 @@ internal data class Task(
val memCapacity: Double,
val gpuCount: Int = 0,
val gpuCapacity: Double = 0.0,
- val parents: Set<Int> = emptySet(),
+ val parents: MutableSet<Int> = mutableSetOf(),
val children: Set<Int> = emptySet(),
val nature: String? = null,
val deadline: Long = -1,
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
index 12dc54b7..1b22e2a7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
@@ -170,7 +170,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
localMemCapacity,
localGpuCount,
localGpuCapacity,
- localParents.toSet(),
+ localParents.toMutableSet(),
localChildren.toSet(),
localNature,
localDeadline,