diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-09-16 18:41:42 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-16 18:41:42 +0200 |
| commit | 13a3f376fec17d5dcb60b635414c64a6d6ea3b13 (patch) | |
| tree | 02707cce678452c596424ac04eeece78e9e1a8a1 | |
| parent | a735f1768677fc996da77b239819c55dcd623f5e (diff) | |
updated workflow implementation for performance (#368)
* Updated the workflow system for performance. Added workflow specific tests.
10 files changed, 666 insertions, 94 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index 581c29ba..48765a3b 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -45,11 +45,6 @@ public interface Flavor : Resource { /** * Set of Tasks that need to be finished before this can startAdd commentMore actions */ - public val dependencies: Set<Int> - - /** - * Set of Tasks that need to be finished before this can startAdd commentMore actions - */ public val parents: Set<Int> /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index 8feddf54..557fb760 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -111,7 +111,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { */ private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>(); - private final List<SchedulingRequest> blockedTasks = new ArrayList<>(); + private final Map<Integer, SchedulingRequest> blockedTasks = new HashMap<>(); /** * The active tasks in the system. @@ -418,22 +418,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { SchedulingRequest request = new SchedulingRequest(task, now); ServiceFlavor flavor = task.getFlavor(); - for (int taskId : this.terminatedTasks) { - if (flavor.isInDependencies(taskId)) { - // Terminate task - task.setState(TaskState.TERMINATED); - } - } - - // Remove all completed tasks from the pending dependencies - flavor.updatePendingDependencies(this.completedTasks); - // If there are still pending dependencies, we cannot schedule the task yet - Set<Integer> pendingDependencies = flavor.getDependencies(); - if (!pendingDependencies.isEmpty()) { - // If the task has pending dependencies, we cannot schedule it yet - LOGGER.debug("Task {} has pending dependencies: {}", task.getId(), pendingDependencies); - blockedTasks.add(request); + // If the task has parents, put in blocked tasks + if (!flavor.getParents().isEmpty()) { + blockedTasks.put(task.getId(), request); return null; } @@ -447,51 +435,83 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { return request; } - void addCompletedTask(ServiceTask task) { - int taskId = task.getId(); + void addCompletedTask(ServiceTask completedTask) { + int parentId = completedTask.getId(); + // int taskId = task.getId(); - if (!this.completedTasks.contains(taskId)) { - this.completedTasks.add(taskId); - } - - List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + // if (!this.completedTasks.contains(taskId)) { + // this.completedTasks.add(taskId); + // } - for (SchedulingRequest request : blockedTasks) { - request.getTask().getFlavor().updatePendingDependencies(taskId); + for (int taskId : completedTask.getFlavor().getChildren()) { + SchedulingRequest request = blockedTasks.get(taskId); + if (request != null) { + request.getTask().getFlavor().removeFromParents(parentId); - Set<Integer> pendingDependencies = request.getTask().getFlavor().getDependencies(); + Set<Integer> pendingDependencies = request.getTask().getFlavor().getParents(); - if (pendingDependencies.isEmpty()) { - requestsToRemove.add(request); - taskQueue.add(request); - tasksPending++; + if (pendingDependencies.isEmpty()) { + taskQueue.add(request); + tasksPending++; + blockedTasks.remove(taskId); + } } } - for (SchedulingRequest request : requestsToRemove) { - blockedTasks.remove(request); - } + // List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + // + // for (SchedulingRequest request : blockedTasks) { + // request.getTask().getFlavor().updatePendingDependencies(taskId); + // + // Set<Integer> pendingDependencies = request.getTask().getFlavor().getDependencies(); + // + // if (pendingDependencies.isEmpty()) { + // requestsToRemove.add(request); + // taskQueue.add(request); + // tasksPending++; + // } + // } + // + // for (SchedulingRequest request : requestsToRemove) { + // blockedTasks.remove(request); + // } } void addTerminatedTask(ServiceTask task) { - int taskId = task.getId(); - List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + for (int taskId : task.getFlavor().getChildren()) { + SchedulingRequest request = blockedTasks.get(taskId); + if (request != null) { + ServiceTask childTask = request.getTask(); - if (!this.terminatedTasks.contains(taskId)) { - this.terminatedTasks.add(taskId); - } + childTask.setState(TaskState.TERMINATED); + + this.addTerminatedTask(childTask); - for (SchedulingRequest request : blockedTasks) { - if (request.getTask().getFlavor().isInDependencies(taskId)) { - requestsToRemove.add(request); - request.getTask().setState(TaskState.TERMINATED); + this.setTaskToBeRemoved(childTask); + + blockedTasks.remove(childTask.getId()); } } - for (SchedulingRequest request : requestsToRemove) { - blockedTasks.remove(request); - } + // int taskId = task.getId(); + // + // List<SchedulingRequest> requestsToRemove = new ArrayList<>(); + // + // if (!this.terminatedTasks.contains(taskId)) { + // this.terminatedTasks.add(taskId); + // } + // + // for (SchedulingRequest request : blockedTasks) { + // if (request.getTask().getFlavor().isInDependencies(taskId)) { + // requestsToRemove.add(request); + // request.getTask().setState(TaskState.TERMINATED); + // } + // } + // + // for (SchedulingRequest request : requestsToRemove) { + // blockedTasks.remove(request); + // } } void delete(ServiceFlavor flavor) { @@ -554,6 +574,8 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { task.setState(TaskState.TERMINATED); + this.addTerminatedTask(task); + this.setTaskToBeRemoved(task); continue; } else { diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java index 6201f21f..2d6f0342 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -23,7 +23,6 @@ package org.opendc.compute.simulator.service; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,7 +41,6 @@ public final class ServiceFlavor implements Flavor { private final int gpuCoreCount; private final Set<Integer> parents; private final Set<Integer> children; - private final Set<Integer> dependencies; private final Map<String, ?> meta; ServiceFlavor( @@ -60,7 +58,6 @@ public final class ServiceFlavor implements Flavor { this.memorySize = memorySize; this.gpuCoreCount = gpuCoreCount; this.parents = parents; - this.dependencies = new HashSet<>(parents); this.children = children; this.meta = meta; } @@ -119,23 +116,18 @@ public final class ServiceFlavor implements Flavor { return "Flavor[name=" + taskId + "]"; } - @Override - public @NotNull Set<Integer> getDependencies() { - return dependencies; - } - - public void updatePendingDependencies(List<Integer> completedTasks) { + public void removeFromParents(List<Integer> completedTasks) { for (int task : completedTasks) { - this.updatePendingDependencies(task); + this.removeFromParents(task); } } - public void updatePendingDependencies(int completedTask) { - this.dependencies.remove(completedTask); + public void removeFromParents(int completedTask) { + this.parents.remove(completedTask); } public boolean isInDependencies(int task) { - return this.dependencies.contains(task); + return this.parents.contains(task); } @Override diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index c875b8a2..a988d774 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -40,7 +40,7 @@ public data class Task( val name: String, var submissionTime: Long, val duration: Long, - val parents: Set<Int> = emptySet(), + val parents: Set<Int> = mutableSetOf(), val children: Set<Int> = emptySet(), val cpuCount: Int, val cpuCapacity: Double, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentCliTest.kt index cd16f174..748a020c 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentRunnerTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentCliTest.kt @@ -29,15 +29,15 @@ import java.io.File /** * An integration test suite for the Experiment Runner. */ -class ExperimentRunnerTest { +class ExperimentCliTest { /** - * ExperimentRunner test 1 + * ExperimentCli test 1 * This test runs the experiment defined in the experiment_1.json file. * * In this test, the bitbrains-small workload is executed with and without a carbon trace. */ @Test - fun testExperimentRunner1() { + fun testExperimentCli1() { ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_1.json")) val someDir = File("output") @@ -45,13 +45,13 @@ class ExperimentRunnerTest { } /** - * ExperimentRunner test 2 + * ExperimentCli test 2 * This test runs the experiment defined in the experiment_2.json file. * * In this test, parts of the Marconi 100 workload is executed . This trace contains GPU tasks. */ @Test - fun testExperimentRunner2() { + fun testExperimentCli2() { ExperimentCommand().main(arrayOf("--experiment-path", "src/test/resources/experiments/experiment_2.json")) val someDir = File("output") diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt index 28096bb8..f43dfbb0 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt @@ -36,9 +36,9 @@ import java.util.ArrayList /** * An integration test suite for the Scenario experiments. */ -class ExperimentTest { +class ScenarioRunnerTest { /** - * Simulator test 1: Single Task + * Scenario test 1: Single Task * In this test, a single task is scheduled that takes 10 minutes to run. * * There should be no problems running the task, so the total runtime should be 10 min. @@ -48,7 +48,7 @@ class ExperimentTest { * When the task is failed, all time is idle. */ @Test - fun testSimulator1() { + fun testScenario1() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -76,7 +76,7 @@ class ExperimentTest { } /** - * Simulator test 1: Two Tasks + * Scenario test 1: Two Tasks * In this test, two tasks are scheduled. * * There should be no problems running the task, so the total runtime should be 15 min. @@ -85,7 +85,7 @@ class ExperimentTest { * The second task is using 100% of the available CPU capacity. */ @Test - fun testSimulator2() { + fun testScenario2() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -129,7 +129,7 @@ class ExperimentTest { } /** - * Simulator test 3: Two Tasks, one scheduled later + * Scenario test 3: Two Tasks, one scheduled later * In this test, two tasks are scheduled. * * There should be no problems running the task, so the total runtime should be 15 min. @@ -138,7 +138,7 @@ class ExperimentTest { * The second task is using 100% of the available CPU capacity. */ @Test - fun testSimulator3() { + fun testScenario3() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -174,7 +174,7 @@ class ExperimentTest { } /** - * Simulator test 4: Two Tasks, one scheduled later + * Scenario test 4: Two Tasks, one scheduled later * In this test, two tasks are scheduled. * * There should be no problems running the task, so the total runtime should be 15 min. @@ -183,7 +183,7 @@ class ExperimentTest { * The second task is using 100% of the available CPU capacity. */ @Test - fun testSimulator4() { + fun testScenario4() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -224,12 +224,12 @@ class ExperimentTest { } /** - * Simulator test 5: One Task purely running on GPU + * Scenario test 5: One Task purely running on GPU * * In this test, a single task is scheduled that takes 10 minutes to run. It solely uses the GPU. */ @Test - fun testSimulator5() { + fun testScenario5() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -272,12 +272,12 @@ class ExperimentTest { } /** - * Simulator test 6: One Task running on CPU & GPU + * Scenario test 6: One Task running on CPU & GPU * * In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used and have the same runtime. */ @Test - fun testSimulator6() { + fun testScenario6() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -319,12 +319,12 @@ class ExperimentTest { } /** - * Simulator test 7: One Task running on CPU & GPU + * Scenario test 7: One Task running on CPU & GPU * * In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used. CPU will finish way ahead of the GPU. */ @Test - fun testSimulator7() { + fun testScenario7() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -365,12 +365,12 @@ class ExperimentTest { } /** - * Simulator test 8: One Task running on CPU & GPU + * Scenario test 8: One Task running on CPU & GPU * * In this test, a single task is scheduled that takes 10 minutes to run. CPU & GPU are used. GPU will finish way ahead of the CPU. */ @Test - fun testSimulator8() { + fun testScenario8() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -410,12 +410,12 @@ class ExperimentTest { } /** - * Simulator test 9: Two tasks running on CPU & GPU + * Scenario test 9: Two tasks running on CPU & GPU * * In this test, two tasks are scheduled at the same time that takes 10 minutes to run. CPU & GPU are used. Both resources will finish at the same time. */ @Test - fun testSimulator9() { + fun testScenario9() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -465,12 +465,12 @@ class ExperimentTest { } /** - * Simulator test 10: Two tasks running on CPU & GPU + * Scenario test 10: Two tasks running on CPU & GPU * * In this test, two tasks are scheduled at the same time that takes 10 minutes to run. One task purely uses CPU, one purely GPU. */ @Test - fun testSimulator10() { + fun testScenario10() { val workload: ArrayList<Task> = arrayListOf( createTestTask( diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt index f34160f7..d5e4c925 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -75,6 +75,8 @@ fun createTestTask( checkpointDuration: Long = 0L, checkpointIntervalScaling: Double = 1.0, scalingPolicy: ScalingPolicy = NoDelayScaling(), + parents: Set<Int> = emptySet(), + children: Set<Int> = emptySet(), ): Task { var usedResources = arrayOf<ResourceType>() if (fragments.any { it.cpuUsage > 0.0 }) { @@ -89,8 +91,8 @@ fun createTestTask( name, LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli(), duration, - emptySet(), - emptySet(), + parents, + children, cpuCount, fragments.maxOf { it.cpuUsage }, 1800000.0, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt new file mode 100644 index 00000000..6e149d45 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/WorkflowTest.kt @@ -0,0 +1,561 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.workload.Task +import org.opendc.simulator.compute.workload.trace.TraceFragment +import java.util.ArrayList + +/** + * An integration test suite for the Scenario experiments. + */ +class WorkflowTest { + /** + * Scenario test 1: simple workflow + * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs. + * The tasks are arranged in a diamond shape, where task 0 is the root task, tasks 1 and 2 are the children of task 0, + * and task 3 is the child of tasks 1 and 2. + * + * There should be no problems running the task, so the total runtime should be 30 min because 1 and 2 can be executed + * at the same time. + * + */ + @Test + fun testWorkflow1() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + id = 0, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = emptySet(), + children = mutableSetOf(1, 2), + ), + createTestTask( + id = 1, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 2, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 3, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(1, 2), + children = emptySet(), + ), + ) + + val topology = createTopology("single_2_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(3 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((10 * 45000) + (10 * 30000) + (10 * 45000)).toLong(), + monitor.hostCpuIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((10 * 15000) + (10 * 30000) + (10 * 15000)).toLong(), + monitor.hostCpuActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { assertEquals(7500.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect host energy usage at timestamp 0" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect host energy usage at timestamp 0" } }, + { assertEquals(7500.0, monitor.hostEnergyUsages["H01"]?.get(20)) { "Incorrect host energy usage at timestamp 0" } }, + { + assertEquals( + 600 * 125.0 + 600 * 150.0 + 600 * 125.0, + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect host energy usage" } + }, + { assertEquals(600 * 125.0 + 600 * 150.0 + 600 * 125.0, monitor.energyUsages.sum()) { "Incorrect total energy usage" } }, + ) + } + + /** + * Scenario test 2: simple workflow + * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs. + * The tasks are arranged in a diamond shape, where task 0 is the root task, tasks 1 and 2 are the children of task 0, + * and task 3 is the child of tasks 1 and 2. However, task 2 has a shorter duration than task 1. + * + * There should be no problems running the task, so the total runtime should still be 30 min because + * 3 can only be executed after 1 is finished. + * + */ + @Test + fun testWorkflow2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + id = 0, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = emptySet(), + children = mutableSetOf(1, 2), + ), + createTestTask( + id = 1, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 2, + fragments = + arrayListOf( + TraceFragment(5 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 3, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(1, 2), + children = emptySet(), + ), + ) + + val topology = createTopology("single_2_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(3 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((10 * 45000) + (5 * 30000) + (15 * 45000)).toLong(), + monitor.hostCpuIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((10 * 15000) + (5 * 30000) + (15 * 15000)).toLong(), + monitor.hostCpuActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { + assertEquals( + 7500.0, + monitor.hostEnergyUsages["H01"]?.get(0), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(10), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 7500.0, + monitor.hostEnergyUsages["H01"]?.get(20), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 600 * 125.0 + 300 * 150.0 + 900 * 125.0, + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect host energy usage" } + }, + { + assertEquals( + 600 * 125.0 + 300 * 150.0 + 900 * 125.0, + monitor.energyUsages.sum(), + ) { "Incorrect total energy usage" } + }, + ) + } + + /** + * Scenario test 3: simple workflow with unconnected extra task + * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs. + * However, there is also another task that is not connected to the workflow running. + * + * This means that the workflow cannot be parallelized as the extra task will take up one CPU for 40 minutes. + * + * The total runtime should therefore be 40 minutes. + */ + @Test + fun testWorkflow3() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + id = 0, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = emptySet(), + children = mutableSetOf(1, 2), + ), + createTestTask( + id = 1, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 2, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 3, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(1, 2), + children = emptySet(), + ), + createTestTask( + id = 5, + fragments = + arrayListOf( + TraceFragment(40 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + ), + ) + + val topology = createTopology("single_2_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(4 * 10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((40 * 30000)).toLong(), + monitor.hostCpuIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((40 * 30000)).toLong(), + monitor.hostCpuActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(0), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(10), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(20), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 2400 * 150.0, + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect host energy usage" } + }, + { + assertEquals( + 2400 * 150.0, + monitor.energyUsages.sum(), + ) { "Incorrect total energy usage" } + }, + ) + } + + /** + * Scenario test 4: simple workflow with unconnected extra task + * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs. + * However, there is also another task that is not connected to the workflow running. + * + * This means that the workflow cannot be parallelized for the first 15 minutes as the extra task will take up one CPU. + * After that, the workflow can be parallelized as the extra task is finished. + * + * The total runtime should therefore be 35 minutes. + */ + @Test + fun testWorkflow4() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + id = 0, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = emptySet(), + children = mutableSetOf(1, 2), + ), + createTestTask( + id = 1, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 2, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 3, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(1, 2), + children = emptySet(), + ), + createTestTask( + id = 5, + fragments = + arrayListOf( + TraceFragment(15 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + ), + ) + + val topology = createTopology("single_2_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(35 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((20 * 30000) + (15 * 45000)).toLong(), + monitor.hostCpuIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((20 * 30000) + (15 * 15000)).toLong(), + monitor.hostCpuActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(0), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 9000.0, + monitor.hostEnergyUsages["H01"]?.get(10), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 7500.0, + monitor.hostEnergyUsages["H01"]?.get(20), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 1200 * 150.0 + 900 * 125.0, + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect host energy usage" } + }, + { + assertEquals( + 1200 * 150.0 + 900 * 125.0, + monitor.energyUsages.sum(), + ) { "Incorrect total energy usage" } + }, + ) + } + + /** + * Scenario test 3: workflow for which the first task cannot be scheduled. + * In this test, a simple workflow with 4 tasks is executed on a single host with 2 CPUs. + * However, the first task can not be scheduled on the host due to its high CPU requirement. + * The whole workflow is therefore terminated. + * + * The single unrelated task should still be executed + * + * The total runtime should therefore be 10 minutes. + */ + @Test + fun testWorkflow5() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + id = 0, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 10000.0), + ), + cpuCount = 10, + parents = emptySet(), + children = mutableSetOf(1, 2), + ), + createTestTask( + id = 1, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 2, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(0), + children = mutableSetOf(3), + ), + createTestTask( + id = 3, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + parents = mutableSetOf(1, 2), + children = emptySet(), + ), + createTestTask( + id = 5, + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0), + ), + cpuCount = 1, + ), + ) + + val topology = createTopology("single_2_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((10 * 45000)).toLong(), + monitor.hostCpuIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((10 * 15000)).toLong(), + monitor.hostCpuActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { + assertEquals( + 7500.0, + monitor.hostEnergyUsages["H01"]?.get(0), + ) { "Incorrect host energy usage at timestamp 0" } + }, + { + assertEquals( + 600 * 125.0, + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect host energy usage" } + }, + { + assertEquals( + 600 * 125.0, + monitor.energyUsages.sum(), + ) { "Incorrect total energy usage" } + }, + ) + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt index f661d5a9..0ebac5eb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt @@ -37,7 +37,7 @@ internal data class Task( val memCapacity: Double, val gpuCount: Int = 0, val gpuCapacity: Double = 0.0, - val parents: Set<Int> = emptySet(), + val parents: MutableSet<Int> = mutableSetOf(), val children: Set<Int> = emptySet(), val nature: String? = null, val deadline: Long = -1, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt index 12dc54b7..1b22e2a7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt @@ -170,7 +170,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< localMemCapacity, localGpuCount, localGpuCapacity, - localParents.toSet(), + localParents.toMutableSet(), localChildren.toSet(), localNature, localDeadline, |
