diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/test')
19 files changed, 981 insertions, 626 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt index 845a8bae..e37bcd4a 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt @@ -23,389 +23,311 @@ package org.opendc.experiments.base import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.simulator.provisioner.Provisioner -import org.opendc.compute.simulator.provisioner.registerComputeMonitor -import org.opendc.compute.simulator.provisioner.setupComputeService -import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.filters.ComputeFilter -import org.opendc.compute.simulator.scheduler.filters.RamFilter -import org.opendc.compute.simulator.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.ComputeMonitor -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.ServiceTableReader -import org.opendc.compute.topology.clusterTopology -import org.opendc.compute.topology.specs.ClusterSpec -import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.Task -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace -import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec -import org.opendc.experiments.base.runner.replay -import org.opendc.simulator.kotlin.runSimulation -import java.io.File -import java.util.Random +import org.opendc.simulator.compute.workload.TraceFragment +import java.util.ArrayList /** * An integration test suite for the Scenario experiments. */ class ExperimentTest { /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestComputeMonitor - - /** - * The [FilterScheduler] to use for all experiments. - */ - private lateinit var computeScheduler: FilterScheduler - - /** - * The [ComputeWorkloadLoader] responsible for loading the traces. - */ - private lateinit var workloadLoader: ComputeWorkloadLoader - - private val basePath = "src/test/resources/Experiment" - - /** - * Set up the experimental environment. - */ - @BeforeEach - fun setUp() { - monitor = TestComputeMonitor() - computeScheduler = - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)), - ) - workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) - } - - /** - * Test a small simulation setup. + * Simulator test 1: Single Task + * In this test, a single task is scheduled that takes 10 minutes to run. + * + * There should be no problems running the task, so the total runtime should be 10 min. + * + * The task is using 50% of the available CPU capacity. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. */ @Test - fun testSingleTask() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + fun testSimulator1() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTaskSingleFailure() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = - TraceBasedFailureModelSpec( - "$basePath/failureTraces/single_failure.parquet", - repeat = false, - ) - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) + val topology = createTopology("single_1_2000.json") - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.setTasksExpected(workload.size) + val monitor = runTest(topology, workload) - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } + assertAll( + { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals((10 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(600 * 150.0, monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } /** - * Test a small simulation setup. + * Simulator test 1: Two Tasks + * In this test, two tasks are scheduled. + * + * There should be no problems running the task, so the total runtime should be 15 min. + * + * The first task is using 50% of the available CPU capacity. + * The second task is using 100% of the available CPU capacity. */ @Test - fun testSingleTask11Failures() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet") - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } }, - { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + fun testSimulator2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(5 * 60 * 1000, 2000.0, 1), + ), + ), ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTaskCheckpoint() = - runSimulation { - val seed = 1L - workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 1000000L, 1000L, 1.0) - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet") - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } + val topology = createTopology("single_1_2000.json") - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) + val monitor = runTest(topology, workload) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, - { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, - { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } + assertAll( + { assertEquals(15 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals(((10 * 30000) + (5 * 60000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals((600 * 150.0) + (300 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } /** - * Test a small simulation setup. + * Simulator test 3: Two Tasks, one scheduled later + * In this test, two tasks are scheduled. + * + * There should be no problems running the task, so the total runtime should be 15 min. + * + * The first task is using 50% of the available CPU capacity. + * The second task is using 100% of the available CPU capacity. */ @Test - fun testSmall() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("bitbrains-small", 0.25, seed) - val topology = createTopology("single.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) + fun testSimulator3() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } + val topology = createTopology("single_2_2000.json") - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) + val monitor = runTest(topology, workload) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } + assertAll( + { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals((600 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + { assertEquals((600 * 150.0), monitor.energyUsages.sum()) { "Incorrect energy usage" } }, + ) + } /** - * Test a large simulation setup. + * Simulator test 4: Two Tasks, one scheduled later + * In this test, two tasks are scheduled. + * + * There should be no problems running the task, so the total runtime should be 15 min. + * + * The first task is using 50% of the available CPU capacity. + * The second task is using 100% of the available CPU capacity. */ @Test - fun testLarge() = - runSimulation { - val seed = 0L - val workload = createTestWorkload("bitbrains-small", 1.0, seed) - val topology = createTopology("multi.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(6.914184592181973E9, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + fun testSimulator4() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(5 * 60 * 1000, 2000.0, 1), + ), + submissionTime = "1970-01-01T00:20", + ), ) - } - /** - * Obtain the trace reader for the test. - */ - private fun createTestWorkload( - traceName: String, - fraction: Double, - seed: Long, - ): List<Task> { - val source = trace(traceName).sampleByLoad(fraction) - return source.resolve(workloadLoader, Random(seed)) + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(25 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000) + (10 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals(((10 * 30000) + (5 * 60000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { + assertEquals( + (600 * 150.0) + (600 * 100.0) + (300 * 200.0), + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect energy usage" } + }, + ) } - /** - * Obtain the topology factory for the test. - */ - private fun createTopology(name: String): List<ClusterSpec> { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Experiment/topologies/$name")) - return stream.use { clusterTopology(stream) } - } - - class TestComputeMonitor : ComputeMonitor { - var attemptsSuccess = 0 - var attemptsFailure = 0 - var attemptsError = 0 - var tasksPending = 0 - var tasksActive = 0 - var tasksTerminated = 0 - var tasksCompleted = 0 - - override fun record(reader: ServiceTableReader) { - attemptsSuccess = reader.attemptsSuccess - attemptsFailure = reader.attemptsFailure - attemptsError = 0 - tasksPending = reader.tasksPending - tasksActive = reader.tasksActive - tasksTerminated = reader.tasksTerminated - tasksCompleted = reader.tasksCompleted - } - - var idleTime = 0L - var activeTime = 0L - var stealTime = 0L - var lostTime = 0L - var powerDraw = 0.0 - var energyUsage = 0.0 - var uptime = 0L - - override fun record(reader: HostTableReader) { - idleTime += reader.cpuIdleTime - activeTime += reader.cpuActiveTime - stealTime += reader.cpuStealTime - lostTime += reader.cpuLostTime - powerDraw += reader.powerDraw - energyUsage += reader.energyUsage - uptime += reader.uptime - } - } +// /** +// * Test a small simulation setup. +// */ +// @Test +// fun testSingleTask() = +// runSimulation { +// val seed = 1L +// val workload = createTestWorkload("single_task", 1.0, seed) +// val topology = createTopology("single.json") +// val monitor = monitor +// +// Provisioner(dispatcher, seed).use { provisioner -> +// provisioner.runSteps( +// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), +// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), +// setupHosts(serviceDomain = "compute.opendc.org", topology), +// ) +// +// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! +// service.replay(timeSource, workload, seed = seed) +// } +// +// println( +// "Scheduler " + +// "Success=${monitor.attemptsSuccess} " + +// "Failure=${monitor.attemptsFailure} " + +// "Error=${monitor.attemptsError} " + +// "Pending=${monitor.tasksPending} " + +// "Active=${monitor.tasksActive}", +// ) +// +// // Note that these values have been verified beforehand +// assertAll( +// { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, +// { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, +// { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, +// { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, +// { assertEquals(1200000.0, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } }, +// ) +// } +// +// /** +// * Test a small simulation setup. +// */ +// @Test +// fun testSmall() = +// runSimulation { +// val seed = 1L +// val workload = createTestWorkload("bitbrains-small", 0.25, seed) +// val topology = createTopology("single.json") +// val monitor = monitor +// +// Provisioner(dispatcher, seed).use { provisioner -> +// provisioner.runSteps( +// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), +// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), +// setupHosts(serviceDomain = "compute.opendc.org", topology), +// ) +// +// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! +// service.replay(timeSource, workload, seed = seed) +// } +// +// println( +// "Scheduler " + +// "Success=${monitor.attemptsSuccess} " + +// "Failure=${monitor.attemptsFailure} " + +// "Error=${monitor.attemptsError} " + +// "Pending=${monitor.tasksPending} " + +// "Active=${monitor.tasksActive}", +// ) +// +// // Note that these values have been verified beforehand +// assertAll( +// { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } }, +// { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } }, +// { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, +// { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, +// { assertEquals(6.7565629E8, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } }, +// ) +// } +// +// /** +// * Test a large simulation setup. +// */ +// @Test +// fun testLarge() = +// runSimulation { +// val seed = 0L +// val workload = createTestWorkload("bitbrains-small", 1.0, seed) +// val topology = createTopology("multi.json") +// val monitor = monitor +// +// Provisioner(dispatcher, seed).use { provisioner -> +// provisioner.runSteps( +// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), +// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), +// setupHosts(serviceDomain = "compute.opendc.org", topology), +// ) +// +// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! +// service.replay(timeSource, workload, seed = seed) +// } +// +// println( +// "Scheduler " + +// "Success=${monitor.attemptsSuccess} " + +// "Failure=${monitor.attemptsFailure} " + +// "Error=${monitor.attemptsError} " + +// "Pending=${monitor.tasksPending} " + +// "Active=${monitor.tasksActive}", +// ) +// +// // Note that these values have been verified beforehand +// assertAll( +// { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, +// { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, +// { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, +// { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, +// { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, +// { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, +// { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } }, +// { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } }, +// { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, +// { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, +// { assertEquals(6.914184592181973E9, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } }, +// ) +// } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt new file mode 100644 index 00000000..90737ab6 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt @@ -0,0 +1,419 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.workload.Task +import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec +import org.opendc.simulator.compute.workload.TraceFragment +import java.util.ArrayList + +/** + * An integration test suite for the Scenario experiments. + */ +class FailuresAndCheckpointingTest { + /** + * Failure test 1: Single Task, Single Failure + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testFailures1() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/single_failure.parquet", + repeat = false, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals(20 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((15 * 30000) + (5 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals((15 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, + { assertEquals((15 * 60 * 150.0) + (5 * 60 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Failure test 2: Single Task, Failure much later + * In this test, a single task is scheduled, with a failure trace. + * + * However, the first failure occurs after 500 min and should thus not affect the Task. + */ + @Test + fun testFailures2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/single_failure_2.parquet", + repeat = false, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals((10 * 30000).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals((10 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals((600 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Failure test 3: Single Task, Single Failure + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testFailures3() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/two_failures.parquet", + repeat = false, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals(37 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((22 * 30000) + (15 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals((22 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, + { assertEquals((22 * 60 * 150.0) + (15 * 60 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Failure test 4: Single Task, repeated failure + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testFailures4() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/single_failure.parquet", + repeat = true, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals(95 * 60000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((50 * 60000) + (20 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals((25 * 60000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(15)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(20)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(25)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(30)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(35)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(40)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(45)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(50)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(55)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(60)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(65)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(70)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(75)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(80)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(85)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(90)) { "Incorrect energy usage" } }, + { assertEquals(0.0, monitor.hostEnergyUsages["H01"]?.get(95)) { "Incorrect energy usage" } }, + { assertEquals((10 * 300 * 150.0) + (9 * 300 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Failure test 1: Single Task with checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testCheckpoints1() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + ), + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals((10 * 60000) + (9 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals(((10 * 30000) + (9 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals((10 * 60 * 150.0) + (9 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Failure test 2: Single Task with scaling checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testCheckpoints2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + checkpointIntervalScaling = 1.5, + ), + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals((10 * 60000) + (4 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, + { assertEquals(((10 * 30000) + (4 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, + { assertEquals((10 * 60 * 150.0) + (4 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Checkpoint test 3: Single Task, single failure with checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + */ + @Test + fun testCheckpoints3() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/single_failure.parquet", + repeat = false, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals((960 * 1000) + 5000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((300 * 1000) + (296 * 500) + (360 * 500)).toLong(), + monitor.hostIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((296 * 500) + 4000 + (360 * 500) + 5000).toLong(), + monitor.hostActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, + { + assertEquals( + (296 * 150.0) + (4 * 200.0) + (300 * 100.0) + + (360 * 150.0) + (5 * 200.0), + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect energy usage" } + }, + ) + } + + /** + * Checkpoint test 4: Single Task, repeated failure with checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + */ + @Test + fun testCheckpoints4() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + ), + ) + + val failureModelSpec = + TraceBasedFailureModelSpec( + "src/test/resources/failureTraces/single_failure.parquet", + repeat = true, + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload, failureModelSpec) + + assertAll( + { assertEquals((22 * 60000) + 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, + { + assertEquals( + ((10 * 60000) + (2 * 296 * 500) + (120 * 500)).toLong(), + monitor.hostIdleTimes["H01"]?.sum(), + ) { "Idle time incorrect" } + }, + { + assertEquals( + ((2 * 296 * 500) + 8000 + (120 * 500) + 1000).toLong(), + monitor.hostActiveTimes["H01"]?.sum(), + ) { "Active time incorrect" } + }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, + { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, + { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, + { + assertEquals( + (2 * 296 * 150.0) + (8 * 200.0) + (600 * 100.0) + + (120 * 150.0) + (200.0), + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect energy usage" } + }, + ) + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt index 2cd464a1..4a7c9341 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -23,118 +23,17 @@ package org.opendc.experiments.base import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.simulator.provisioner.Provisioner -import org.opendc.compute.simulator.provisioner.registerComputeMonitor -import org.opendc.compute.simulator.provisioner.setupComputeService -import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.filters.ComputeFilter -import org.opendc.compute.simulator.scheduler.filters.RamFilter -import org.opendc.compute.simulator.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.ComputeMonitor -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.TaskTableReader -import org.opendc.compute.topology.clusterTopology -import org.opendc.compute.topology.specs.ClusterSpec -import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.Task -import org.opendc.experiments.base.runner.replay import org.opendc.simulator.compute.workload.TraceFragment -import org.opendc.simulator.compute.workload.TraceWorkload -import org.opendc.simulator.kotlin.runSimulation -import java.io.File -import java.time.Duration -import java.time.LocalDateTime -import java.time.ZoneId import java.util.ArrayList -import java.util.UUID /** * Testing suite containing tests that specifically test the FlowDistributor */ class FlowDistributorTest { /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestComputeMonitor - - /** - * The [FilterScheduler] to use for all experiments. - */ - private lateinit var computeScheduler: FilterScheduler - - /** - * The [ComputeWorkloadLoader] responsible for loading the traces. - */ - private lateinit var workloadLoader: ComputeWorkloadLoader - - private val basePath = "src/test/resources/FlowDistributor" - - /** - * Set up the experimental environment. - */ - @BeforeEach - fun setUp() { - monitor = TestComputeMonitor() - computeScheduler = - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)), - ) - workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) - } - - private fun createTestTask( - name: String, - cpuCount: Int = 1, - cpuCapacity: Double = 0.0, - memCapacity: Long = 0L, - submissionTime: String = "1970-01-01T00:00", - duration: Long = 0L, - fragments: ArrayList<TraceFragment>, - ): Task { - return Task( - UUID.nameUUIDFromBytes(name.toByteArray()), - name, - cpuCount, - cpuCapacity, - memCapacity, - 1800000.0, - LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), - duration, - TraceWorkload( - fragments, - ), - ) - } - - private fun runTest( - topology: List<ClusterSpec>, - workload: ArrayList<Task>, - ): TestComputeMonitor { - runSimulation { - val monitor = monitor - val seed = 0L - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload) - } - } - return monitor - } - - /** * FlowDistributor test 1: A single fitting task * In this test, a single task is scheduled that should fit the FlowDistributor * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. @@ -154,17 +53,17 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -189,17 +88,17 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -224,17 +123,17 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -259,17 +158,17 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -294,17 +193,17 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -337,7 +236,7 @@ class FlowDistributorTest { ) val topology = createTopology("single_2_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, @@ -348,10 +247,10 @@ class FlowDistributorTest { { assertEquals(1000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(1000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -384,7 +283,7 @@ class FlowDistributorTest { ) val topology = createTopology("single_2_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(6000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, @@ -395,10 +294,10 @@ class FlowDistributorTest { { assertEquals(6000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, - { assertEquals(11000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } }, ) } @@ -431,7 +330,7 @@ class FlowDistributorTest { ) val topology = createTopology("single_2_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, @@ -446,14 +345,14 @@ class FlowDistributorTest { { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, - { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, ) } @@ -488,7 +387,7 @@ class FlowDistributorTest { ) val topology = createTopology("single_2_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, @@ -503,12 +402,12 @@ class FlowDistributorTest { { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, ) } @@ -543,7 +442,7 @@ class FlowDistributorTest { ) val topology = createTopology("single_2_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) assertAll( { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, @@ -562,14 +461,14 @@ class FlowDistributorTest { { assertEquals(2500.0, monitor.taskCpuSupplied["1"]?.get(5)) { "The cpu used by task 1 is incorrect" } }, { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(9)) { "The cpu used by task 1 is incorrect" } }, { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(14)) { "The cpu used by task 1 is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(5500.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, - { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(5500.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } }, ) } @@ -588,13 +487,17 @@ class FlowDistributorTest { name = "0", fragments = arrayListOf<TraceFragment>().apply { - repeat(10) { this.add(TraceFragment(20 * 60 * 1000, 3000.0, 1)) } + repeat(1) { this.add(TraceFragment(10 * 60 * 1000, 3000.0, 1)) } }, ), ) val topology = createTopology("single_5000_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } }, + ) } /** @@ -618,7 +521,11 @@ class FlowDistributorTest { ) val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals(1000 * 10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } }, + ) } /** @@ -644,39 +551,10 @@ class FlowDistributorTest { } val topology = createTopology("single_1_2000.json") - monitor = runTest(topology, workload) - } + val monitor = runTest(topology, workload) - /** - * Obtain the topology factory for the test. - */ - private fun createTopology(name: String): List<ClusterSpec> { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/FlowDistributor/topologies/$name")) - return stream.use { clusterTopology(stream) } - } - - class TestComputeMonitor : ComputeMonitor { - var hostCpuDemands = ArrayList<Double>() - var hostCpuSupplied = ArrayList<Double>() - - override fun record(reader: HostTableReader) { - hostCpuDemands.add(reader.cpuDemand) - hostCpuSupplied.add(reader.cpuUsage) - } - - var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>() - var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>() - - override fun record(reader: TaskTableReader) { - val taskName: String = reader.taskInfo.name - - if (taskName in taskCpuDemands) { - taskCpuDemands[taskName]?.add(reader.cpuDemand) - taskCpuSupplied[taskName]?.add(reader.cpuUsage) - } else { - taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) - taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) - } - } + assertAll( + { assertEquals(1000 * 10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } }, + ) } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt new file mode 100644 index 00000000..e5613e50 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.opendc.compute.simulator.provisioner.Provisioner +import org.opendc.compute.simulator.provisioner.registerComputeMonitor +import org.opendc.compute.simulator.provisioner.setupComputeService +import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.topology.specs.ClusterSpec +import org.opendc.compute.workload.Task +import org.opendc.experiments.base.experiment.specs.FailureModelSpec +import org.opendc.experiments.base.runner.replay +import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.kotlin.runSimulation +import java.time.Duration +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.UUID +import kotlin.collections.ArrayList + +/** + * Obtain the topology factory for the test. + */ +fun createTopology(name: String): List<ClusterSpec> { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name")) + return stream.use { clusterTopology(stream) } +} + +fun createTestTask( + name: String, + memCapacity: Long = 0L, + submissionTime: String = "1970-01-01T00:00", + duration: Long = 0L, + fragments: ArrayList<TraceFragment>, + checkpointInterval: Long = 0L, + checkpointDuration: Long = 0L, + checkpointIntervalScaling: Double = 1.0, +): Task { + return Task( + UUID.nameUUIDFromBytes(name.toByteArray()), + name, + fragments.maxOf { it.coreCount }, + fragments.maxOf { it.cpuUsage }, + memCapacity, + 1800000.0, + LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), + duration, + TraceWorkload( + fragments, + checkpointInterval, + checkpointDuration, + checkpointIntervalScaling, + ), + ) +} + +fun runTest( + topology: List<ClusterSpec>, + workload: ArrayList<Task>, + failureModelSpec: FailureModelSpec? = null, +): TestComputeMonitor { + val monitor = TestComputeMonitor() + + val computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + + runSimulation { + val seed = 0L + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.setTasksExpected(workload.size) + service.setMetricReader(provisioner.getMonitor()) + + service.replay(timeSource, workload, failureModelSpec = failureModelSpec) + } + } + return monitor +} + +class TestComputeMonitor : ComputeMonitor { + var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>() + var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>() + + override fun record(reader: TaskTableReader) { + val taskName: String = reader.taskInfo.name + + if (taskName in taskCpuDemands) { + taskCpuDemands[taskName]?.add(reader.cpuDemand) + taskCpuSupplied[taskName]?.add(reader.cpuUsage) + } else { + taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) + taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) + } + } + + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + var tasksPending = 0 + var tasksActive = 0 + var tasksTerminated = 0 + var tasksCompleted = 0 + + var timestamps = ArrayList<Long>() + + var maxTimestamp = 0L + + override fun record(reader: ServiceTableReader) { + attemptsSuccess = reader.attemptsSuccess + attemptsFailure = reader.attemptsFailure + attemptsError = 0 + tasksPending = reader.tasksPending + tasksActive = reader.tasksActive + tasksTerminated = reader.tasksTerminated + tasksCompleted = reader.tasksCompleted + + timestamps.add(reader.timestamp.toEpochMilli()) + maxTimestamp = reader.timestamp.toEpochMilli() + } + + var hostIdleTimes = mutableMapOf<String, ArrayList<Long>>() + var hostActiveTimes = mutableMapOf<String, ArrayList<Long>>() + var hostStealTimes = mutableMapOf<String, ArrayList<Long>>() + var hostLostTimes = mutableMapOf<String, ArrayList<Long>>() + + var hostCpuDemands = mutableMapOf<String, ArrayList<Double>>() + var hostCpuSupplied = mutableMapOf<String, ArrayList<Double>>() + var hostPowerDraws = mutableMapOf<String, ArrayList<Double>>() + var hostEnergyUsages = mutableMapOf<String, ArrayList<Double>>() + + override fun record(reader: HostTableReader) { + val hostName: String = reader.host.name + + if (!(hostName in hostCpuDemands)) { + hostIdleTimes[hostName] = ArrayList() + hostActiveTimes[hostName] = ArrayList() + hostStealTimes[hostName] = ArrayList() + hostLostTimes[hostName] = ArrayList() + + hostCpuDemands[hostName] = ArrayList() + hostCpuSupplied[hostName] = ArrayList() + hostPowerDraws[hostName] = ArrayList() + hostEnergyUsages[hostName] = ArrayList() + } + + hostIdleTimes[hostName]?.add(reader.cpuIdleTime) + hostActiveTimes[hostName]?.add(reader.cpuActiveTime) + hostStealTimes[hostName]?.add(reader.cpuStealTime) + hostLostTimes[hostName]?.add(reader.cpuLostTime) + + hostCpuDemands[hostName]?.add(reader.cpuDemand) + hostCpuSupplied[hostName]?.add(reader.cpuUsage) + hostPowerDraws[hostName]?.add(reader.powerDraw) + hostEnergyUsages[hostName]?.add(reader.energyUsage) + } + + var powerDraws = ArrayList<Double>() + var energyUsages = ArrayList<Double>() + + override fun record(reader: PowerSourceTableReader) { + powerDraws.add(reader.powerDraw) + energyUsages.add(reader.energyUsage) + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet Binary files differdeleted file mode 100644 index dbd93acb..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet +++ /dev/null diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet Binary files differdeleted file mode 100644 index d1f8b853..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet +++ /dev/null diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json deleted file mode 100644 index c3a060cc..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 32, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 256000 - } - } - ] - }, - { - "name": "C02", - "hosts" : - [ - { - "name": "H02", - "count": 6, - "cpu": - { - "coreCount": 8, - "coreSpeed": 2930 - }, - "memory": { - "memorySize": 64000 - } - } - ] - }, - { - "name": "C03", - "hosts" : - [ - { - "name": "H03", - "count": 2, - "cpu": - { - "coreCount": 16, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 128000 - } - } - ] - } - ] -} - diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json deleted file mode 100644 index de66bfc2..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 12, - "coreSpeed": 3300, - "count": 1 - }, - "memory": { - "memorySize": 140457600000 - } - } - ] - } - ] -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet Binary files differnew file mode 100644 index 00000000..44804ffa --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure_2.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure_2.parquet Binary files differnew file mode 100644 index 00000000..7dae482a --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure_2.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/two_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/two_failures.parquet Binary files differnew file mode 100644 index 00000000..9f0fea0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/two_failures.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_1_2000.json index 6790a10f..ac9a3082 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_1_2000.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_1_2000.json @@ -14,6 +14,12 @@ }, "memory": { "memorySize": 140457600000 + }, + "powerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 } } ] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_2_2000.json index 4bab620a..24ab0bcd 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_2_2000.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_2_2000.json @@ -14,6 +14,12 @@ }, "memory": { "memorySize": 140457600000 + }, + "powerModel": { + "modelType": "linear", + "power": 400.0, + "idlePower": 100.0, + "maxPower": 200.0 } } ] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_5000_2000.json index 9f1e418f..9f1e418f 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/FlowDistributor/topologies/single_5000_2000.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single_5000_2000.json diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/fragments.parquet Binary files differindex 240f58e3..240f58e3 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/interference-model.json index 51fc6366..51fc6366 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/interference-model.json diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/tasks.parquet Binary files differindex 8e9dcea7..8e9dcea7 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/bitbrains-small/tasks.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/single_task/fragments.parquet Binary files differindex 94a2d69e..94a2d69e 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/single_task/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/single_task/tasks.parquet Binary files differindex 2a7da2eb..2a7da2eb 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/workload_traces/single_task/tasks.parquet |
