From b4f694d9083e28f67e1746a37f4761cda6699263 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 6 Dec 2024 09:21:19 +0100 Subject: Added 9 new tests specifically testing the Multiplexer. This assumes the Multiplexer is using MaxMinFairness given that this is currently the default and only fairness available in OpenDC. (#280) --- .../org/opendc/experiments/base/ExperimentTest.kt | 411 ++++++++++++++++ .../org/opendc/experiments/base/MultiplexerTest.kt | 544 +++++++++++++++++++++ .../experiments/base/ScenarioIntegrationTest.kt | 409 ---------------- .../opendc/experiments/base/ScenarioRunnerTest.kt | 79 --- .../Experiment/failureTraces/11_failures.parquet | Bin 0 -> 2786 bytes .../failureTraces/single_failure.parquet | Bin 0 -> 2786 bytes .../resources/Experiment/topologies/multi.json | 59 +++ .../resources/Experiment/topologies/single.json | 23 + .../traces/bitbrains-small/fragments.parquet | Bin 0 -> 717069 bytes .../traces/bitbrains-small/interference-model.json | 21 + .../traces/bitbrains-small/tasks.parquet | Bin 0 -> 5525 bytes .../traces/single_task/fragments.parquet | Bin 0 -> 3012 bytes .../Experiment/traces/single_task/tasks.parquet | Bin 0 -> 4471 bytes .../Multiplexer/topologies/single_1_2000.json | 22 + .../Multiplexer/topologies/single_2_2000.json | 22 + .../traces/multiplexer_test_1/fragments.parquet | Bin 0 -> 2684 bytes .../traces/multiplexer_test_1/tasks.parquet | Bin 0 -> 3919 bytes .../traces/multiplexer_test_2/fragments.parquet | Bin 0 -> 2684 bytes .../traces/multiplexer_test_2/tasks.parquet | Bin 0 -> 3919 bytes .../traces/multiplexer_test_3/fragments.parquet | Bin 0 -> 2684 bytes .../traces/multiplexer_test_3/tasks.parquet | Bin 0 -> 3919 bytes .../traces/multiplexer_test_4/fragments.parquet | Bin 0 -> 2684 bytes .../traces/multiplexer_test_4/tasks.parquet | Bin 0 -> 3919 bytes .../traces/multiplexer_test_5/fragments.parquet | Bin 0 -> 2689 bytes .../traces/multiplexer_test_5/tasks.parquet | Bin 0 -> 3924 bytes .../traces/multiplexer_test_6/fragments.parquet | Bin 0 -> 2689 bytes .../traces/multiplexer_test_6/tasks.parquet | Bin 0 -> 3924 bytes .../traces/multiplexer_test_7/fragments.parquet | Bin 0 -> 2689 bytes .../traces/multiplexer_test_7/tasks.parquet | Bin 0 -> 3940 bytes .../traces/multiplexer_test_8/fragments.parquet | Bin 0 -> 2697 bytes .../traces/multiplexer_test_8/tasks.parquet | Bin 0 -> 3940 bytes .../traces/multiplexer_test_9/fragments.parquet | Bin 0 -> 2709 bytes .../traces/multiplexer_test_9/tasks.parquet | Bin 0 -> 3924 bytes .../resources/failureTraces/11_failures.parquet | Bin 2786 -> 0 bytes .../resources/failureTraces/single_failure.parquet | Bin 2786 -> 0 bytes .../src/test/resources/model.json | 15 - .../src/test/resources/topologies/multi.json | 59 --- .../src/test/resources/topologies/single.json | 23 - .../traces/bitbrains-small/fragments.parquet | Bin 717069 -> 0 bytes .../traces/bitbrains-small/interference-model.json | 21 - .../resources/traces/bitbrains-small/tasks.parquet | Bin 5525 -> 0 bytes .../resources/traces/single_task/fragments.parquet | Bin 3012 -> 0 bytes .../resources/traces/single_task/tasks.parquet | Bin 4471 -> 0 bytes .../java/org/opendc/simulator/Multiplexer.java | 19 +- 44 files changed, 1113 insertions(+), 614 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt create mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/tasks.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/model.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt new file mode 100644 index 00000000..8e9a3ad7 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt @@ -0,0 +1,411 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.simulator.provisioner.Provisioner +import org.opendc.compute.simulator.provisioner.registerComputeMonitor +import org.opendc.compute.simulator.provisioner.setupComputeService +import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.topology.specs.ClusterSpec +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.Task +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace +import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec +import org.opendc.experiments.base.runner.replay +import org.opendc.simulator.kotlin.runSimulation +import java.io.File +import java.util.Random + +/** + * An integration test suite for the Scenario experiments. + */ +class ExperimentTest { + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestComputeMonitor + + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + private val basePath = "src/test/resources/Experiment" + + /** + * Set up the experimental environment. + */ + @BeforeEach + fun setUp() { + monitor = TestComputeMonitor() + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSingleTask() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSingleTaskSingleFailure() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = + TraceBasedFailureModelSpec( + "$basePath/failureTraces/single_failure.parquet", + repeat = false, + ) + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.setTasksExpected(workload.size) + + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSingleTask11Failures() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet") + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSingleTaskCheckpoint() = + runSimulation { + val seed = 1L + workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 1000000L, 1000L, 1.0) + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet") + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, + { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSmall() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("bitbrains-small", 0.25, seed) + val topology = createTopology("single.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a large simulation setup. + */ + @Test + fun testLarge() = + runSimulation { + val seed = 0L + val workload = createTestWorkload("bitbrains-small", 1.0, seed) + val topology = createTopology("multi.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, + { assertEquals(43101787447, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412553, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestWorkload( + traceName: String, + fraction: Double, + seed: Long, + ): List { + val source = trace(traceName).sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed)) + } + + /** + * Obtain the topology factory for the test. + */ + private fun createTopology(name: String): List { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Experiment/topologies/$name")) + return stream.use { clusterTopology(stream) } + } + + class TestComputeMonitor : ComputeMonitor { + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + var tasksPending = 0 + var tasksActive = 0 + var tasksTerminated = 0 + var tasksCompleted = 0 + + override fun record(reader: ServiceTableReader) { + attemptsSuccess = reader.attemptsSuccess + attemptsFailure = reader.attemptsFailure + attemptsError = 0 + tasksPending = reader.tasksPending + tasksActive = reader.tasksActive + tasksTerminated = reader.tasksTerminated + tasksCompleted = reader.tasksCompleted + } + + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var powerDraw = 0.0 + var energyUsage = 0.0 + var uptime = 0L + + override fun record(reader: HostTableReader) { + idleTime += reader.cpuIdleTime + activeTime += reader.cpuActiveTime + stealTime += reader.cpuStealTime + lostTime += reader.cpuLostTime + powerDraw += reader.powerDraw + energyUsage += reader.energyUsage + uptime += reader.uptime + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt new file mode 100644 index 00000000..1c0afd7f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/MultiplexerTest.kt @@ -0,0 +1,544 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.simulator.provisioner.Provisioner +import org.opendc.compute.simulator.provisioner.registerComputeMonitor +import org.opendc.compute.simulator.provisioner.setupComputeService +import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.topology.specs.ClusterSpec +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.Task +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace +import org.opendc.experiments.base.runner.replay +import org.opendc.simulator.kotlin.runSimulation +import java.io.File +import java.time.Duration +import java.util.ArrayList +import java.util.Random + +/** + * Testing suite containing tests that specifically test the Multiplexer + */ +class MultiplexerTest { + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestComputeMonitor + + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + private val basePath = "src/test/resources/Multiplexer" + + /** + * Set up the experimental environment. + */ + @BeforeEach + fun setUp() { + monitor = TestComputeMonitor() + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) + } + + /** + * Multiplexer test 1: A single fitting task + * In this test, a single task is scheduled that should fit the Multiplexer + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer1() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_1", 1.0, seed) + val topology = createTopology("single_1_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 2: A single overloaded task + * In this test, a single task is scheduled that does not fit the Multiplexer + * In this test we expect the usage to be lower than the demand. + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer2() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_2", 1.0, seed) + val topology = createTopology("single_1_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + assertAll( + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 3: A single task transition fit to overloaded + * In this test, a single task is scheduled where the first fragment fits, but the second does not. + * For the first fragment, we expect the usage of the second fragment to be lower than the demand + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer3() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_3", 1.0, seed) + val topology = createTopology("single_1_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 4: A single task transition overload to fit + * In this test, a single task is scheduled where the first fragment does not fit, and the second does. + * For the first fragment, we expect the usage of the first fragment to be lower than the demand + * We check if both the host and the Task show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer4() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_4", 1.0, seed) + val topology = createTopology("single_1_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + assertAll( + { assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 5: Two task, same time, both fit + * In this test, two tasks are scheduled, and they fit together on the host . The tasks start and finish at the same time + * This test shows how the multiplexer handles two tasks that can fit and no redistribution is required. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer5() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_5", 1.0, seed) + val topology = createTopology("single_2_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 6: Two task, same time, can not fit + * In this test, two tasks are scheduled, and they can not both fit. The tasks start and finish at the same time + * This test shows how the multiplexer handles two tasks that both do not fit and redistribution is required. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer6() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_6", 1.0, seed) + val topology = createTopology("single_2_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println(monitor.taskCpuDemands) + println(monitor.hostCpuDemands) + + assertAll( + { assertEquals(6000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(5000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(5000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(6000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 7: Two task, both fit, second task is delayed + * In this test, two tasks are scheduled, the second task is delayed. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer7() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_7", 1.0, seed) + val topology = createTopology("single_2_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println(monitor.taskCpuDemands) + println(monitor.hostCpuDemands) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, + { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 8: Two task, both fit on their own but not together, second task is delayed + * In this test, two tasks are scheduled, the second task is delayed. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + * When the second task comes in, the host is overloaded. + * This test shows how the multiplexer can handle redistribution when a new task comes in. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer8() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_8", 1.0, seed) + val topology = createTopology("single_2_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println(monitor.taskCpuDemands) + println(monitor.hostCpuDemands) + + assertAll( + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Multiplexer test 9: Two task, one changes demand, causing overload + * In this test, two tasks are scheduled, and they can both fit. + * However, task 0 increases its demand which overloads the multiplexer. + * This test shows how the multiplexer handles transition from fitting to overloading when multiple tasks are running. + * We check if both the host and the Tasks show the correct cpu usage and demand during the two fragments. + */ + @Test + fun testMultiplexer9() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("multiplexer_test_9", 1.0, seed) + val topology = createTopology("single_2_2000.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println(monitor.taskCpuDemands) + println(monitor.hostCpuDemands) + + assertAll( + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuDemands["0"]?.get(5)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(14)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1500.0, monitor.taskCpuSupplied["0"]?.get(5)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(9)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(14)) { "The cpu used by task 0 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(1)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(5)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(9)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuDemands["1"]?.get(14)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2500.0, monitor.taskCpuSupplied["1"]?.get(5)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(9)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(14)) { "The cpu used by task 1 is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(5500.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } }, + { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } }, + { assertEquals(0.0, monitor.hostCpuDemands.last()) { "When the task is finished, the host should have 0.0 demand" } }, + { assertEquals(0.0, monitor.hostCpuSupplied.last()) { "When the task is finished, the host should have 0.0 demand" } }, + ) + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestWorkload( + traceName: String, + fraction: Double, + seed: Long, + ): List { + val source = trace(traceName).sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed)) + } + + /** + * Obtain the topology factory for the test. + */ + private fun createTopology(name: String): List { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Multiplexer/topologies/$name")) + return stream.use { clusterTopology(stream) } + } + + class TestComputeMonitor : ComputeMonitor { + var hostCpuDemands = ArrayList() + var hostCpuSupplied = ArrayList() + + override fun record(reader: HostTableReader) { + hostCpuDemands.add(reader.cpuDemand) + hostCpuSupplied.add(reader.cpuUsage) + } + + var taskCpuDemands = mutableMapOf>() + var taskCpuSupplied = mutableMapOf>() + + override fun record(reader: TaskTableReader) { + val taskName: String = reader.taskInfo.name + + if (taskName in taskCpuDemands) { + taskCpuDemands[taskName]?.add(reader.cpuDemand) + taskCpuSupplied[taskName]?.add(reader.cpuUsage) + } else { + taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) + taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) + } + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt deleted file mode 100644 index 10478174..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.base - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.simulator.provisioner.Provisioner -import org.opendc.compute.simulator.provisioner.registerComputeMonitor -import org.opendc.compute.simulator.provisioner.setupComputeService -import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.filters.ComputeFilter -import org.opendc.compute.simulator.scheduler.filters.RamFilter -import org.opendc.compute.simulator.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.ComputeMonitor -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.ServiceTableReader -import org.opendc.compute.topology.clusterTopology -import org.opendc.compute.topology.specs.ClusterSpec -import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.Task -import org.opendc.compute.workload.sampleByLoad -import org.opendc.compute.workload.trace -import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec -import org.opendc.experiments.base.runner.replay -import org.opendc.simulator.kotlin.runSimulation -import java.io.File -import java.util.Random - -/** - * An integration test suite for the Scenario experiments. - */ -class ScenarioIntegrationTest { - /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestComputeMonitor - - /** - * The [FilterScheduler] to use for all experiments. - */ - private lateinit var computeScheduler: FilterScheduler - - /** - * The [ComputeWorkloadLoader] responsible for loading the traces. - */ - private lateinit var workloadLoader: ComputeWorkloadLoader - - /** - * Set up the experimental environment. - */ - @BeforeEach - fun setUp() { - monitor = TestComputeMonitor() - computeScheduler = - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)), - ) - workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 0L, 0L, 0.0) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTask() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTaskSingleFailure() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = - TraceBasedFailureModelSpec( - "src/test/resources/failureTraces/single_failure.parquet", - repeat = false, - ) - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.setTasksExpected(workload.size) - - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTask11Failures() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } }, - { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSingleTaskCheckpoint() = - runSimulation { - val seed = 1L - workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 1000000L, 1000L, 1.0) - val workload = createTestWorkload("single_task", 1.0, seed) - val topology = createTopology("single.json") - val monitor = monitor - val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, - { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, - { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Test a small simulation setup. - */ - @Test - fun testSmall() = - runSimulation { - val seed = 1L - val workload = createTestWorkload("bitbrains-small", 0.25, seed) - val topology = createTopology("single.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Test a large simulation setup. - */ - @Test - fun testLarge() = - runSimulation { - val seed = 0L - val workload = createTestWorkload("bitbrains-small", 1.0, seed) - val topology = createTopology("multi.json") - val monitor = monitor - - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) - } - - println( - "Scheduler " + - "Success=${monitor.attemptsSuccess} " + - "Failure=${monitor.attemptsFailure} " + - "Error=${monitor.attemptsError} " + - "Pending=${monitor.tasksPending} " + - "Active=${monitor.tasksActive}", - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, - ) - } - - /** - * Obtain the trace reader for the test. - */ - private fun createTestWorkload( - traceName: String, - fraction: Double, - seed: Long, - ): List { - val source = trace(traceName).sampleByLoad(fraction) - return source.resolve(workloadLoader, Random(seed)) - } - - /** - * Obtain the topology factory for the test. - */ - private fun createTopology(name: String): List { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name")) - return stream.use { clusterTopology(stream) } - } - - class TestComputeMonitor : ComputeMonitor { - var attemptsSuccess = 0 - var attemptsFailure = 0 - var attemptsError = 0 - var tasksPending = 0 - var tasksActive = 0 - var tasksTerminated = 0 - var tasksCompleted = 0 - - override fun record(reader: ServiceTableReader) { - attemptsSuccess = reader.attemptsSuccess - attemptsFailure = reader.attemptsFailure - attemptsError = 0 - tasksPending = reader.tasksPending - tasksActive = reader.tasksActive - tasksTerminated = reader.tasksTerminated - tasksCompleted = reader.tasksCompleted - } - - var idleTime = 0L - var activeTime = 0L - var stealTime = 0L - var lostTime = 0L - var powerDraw = 0.0 - var energyUsage = 0.0 - var uptime = 0L - - override fun record(reader: HostTableReader) { - idleTime += reader.cpuIdleTime - activeTime += reader.cpuActiveTime - stealTime += reader.cpuStealTime - lostTime += reader.cpuLostTime - powerDraw += reader.powerDraw - energyUsage += reader.energyUsage - uptime += reader.uptime - } - } -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt deleted file mode 100644 index 0b32b8f6..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.base - -import java.io.File - -/** - * Test suite for [ScenarioRunner]. - */ -class ScenarioRunnerTest { - /** - * The path to the environments. - */ - private val envPath = File("src/test/resources/env") - - /** - * The path to the traces. - */ - private val tracePath = File("src/test/resources/trace") - - /** - * Smoke test with output. - * fixme: Fix failures and enable - * - fun testSmoke() { - val outputPath = Files.createTempDirectory("output").toFile() - - try { - val runner = ScenarioRunner(envPath, tracePath, outputPath) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-tasks" - ) - - assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } - } finally { - outputPath.delete() - } - } - - /** - * Smoke test without output. - * fixme: Fix failures and enable - */ - fun testSmokeNoOutput() { - val runner = ScenarioRunner(envPath, tracePath, null) - val scenario = Scenario( - Topology("topology"), - Workload("bitbrains-small", trace("bitbrains-small")), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-tasks" - ) - - assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } - } - */ -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet new file mode 100644 index 00000000..dbd93acb Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/11_failures.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet new file mode 100644 index 00000000..d1f8b853 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/failureTraces/single_failure.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json new file mode 100644 index 00000000..c3a060cc --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/multi.json @@ -0,0 +1,59 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 32, + "coreSpeed": 3200 + }, + "memory": { + "memorySize": 256000 + } + } + ] + }, + { + "name": "C02", + "hosts" : + [ + { + "name": "H02", + "count": 6, + "cpu": + { + "coreCount": 8, + "coreSpeed": 2930 + }, + "memory": { + "memorySize": 64000 + } + } + ] + }, + { + "name": "C03", + "hosts" : + [ + { + "name": "H03", + "count": 2, + "cpu": + { + "coreCount": 16, + "coreSpeed": 3200 + }, + "memory": { + "memorySize": 128000 + } + } + ] + } + ] +} + diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json new file mode 100644 index 00000000..de66bfc2 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/topologies/single.json @@ -0,0 +1,23 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 12, + "coreSpeed": 3300, + "count": 1 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet new file mode 100644 index 00000000..240f58e3 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json new file mode 100644 index 00000000..51fc6366 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/interference-model.json @@ -0,0 +1,21 @@ +[ + { + "vms": [ + "141", + "379", + "851", + "116" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "205", + "116", + "463" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet new file mode 100644 index 00000000..8e9dcea7 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/bitbrains-small/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet new file mode 100644 index 00000000..94a2d69e Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet new file mode 100644 index 00000000..2a7da2eb Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Experiment/traces/single_task/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json new file mode 100644 index 00000000..6790a10f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_1_2000.json @@ -0,0 +1,22 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 1, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json new file mode 100644 index 00000000..4bab620a --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/topologies/single_2_2000.json @@ -0,0 +1,22 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 2, + "coreSpeed": 2000 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/fragments.parquet new file mode 100644 index 00000000..5d222b5c Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/tasks.parquet new file mode 100644 index 00000000..fa881645 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_1/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/fragments.parquet new file mode 100644 index 00000000..9fcf78f2 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/tasks.parquet new file mode 100644 index 00000000..fa881645 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_2/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/fragments.parquet new file mode 100644 index 00000000..238bad8f Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/tasks.parquet new file mode 100644 index 00000000..fa881645 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_3/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/fragments.parquet new file mode 100644 index 00000000..3e4bcc2a Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/tasks.parquet new file mode 100644 index 00000000..fa881645 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_4/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/fragments.parquet new file mode 100644 index 00000000..e0a76334 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/tasks.parquet new file mode 100644 index 00000000..0982b0f7 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_5/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/fragments.parquet new file mode 100644 index 00000000..84d982da Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/tasks.parquet new file mode 100644 index 00000000..0982b0f7 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_6/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/fragments.parquet new file mode 100644 index 00000000..0cc276ef Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/tasks.parquet new file mode 100644 index 00000000..efd72165 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_7/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/fragments.parquet new file mode 100644 index 00000000..eaa964e3 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/tasks.parquet new file mode 100644 index 00000000..efd72165 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_8/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/fragments.parquet new file mode 100644 index 00000000..d5e7e0ae Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/tasks.parquet new file mode 100644 index 00000000..0982b0f7 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/Multiplexer/traces/multiplexer_test_9/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet deleted file mode 100644 index dbd93acb..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet deleted file mode 100644 index d1f8b853..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/model.json deleted file mode 100644 index 91e2657f..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/model.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "version": "1.0", - "defaultSchema": "trace", - "schemas": [ - { - "name": "trace", - "type": "custom", - "factory": "org.opendc.trace.calcite.TraceSchemaFactory", - "operand": { - "path": "trace", - "format": "opendc-vm" - } - } - ] -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json deleted file mode 100644 index c3a060cc..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 32, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 256000 - } - } - ] - }, - { - "name": "C02", - "hosts" : - [ - { - "name": "H02", - "count": 6, - "cpu": - { - "coreCount": 8, - "coreSpeed": 2930 - }, - "memory": { - "memorySize": 64000 - } - } - ] - }, - { - "name": "C03", - "hosts" : - [ - { - "name": "H03", - "count": 2, - "cpu": - { - "coreCount": 16, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 128000 - } - } - ] - } - ] -} - diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json deleted file mode 100644 index de66bfc2..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 12, - "coreSpeed": 3300, - "count": 1 - }, - "memory": { - "memorySize": 140457600000 - } - } - ] - } - ] -} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet deleted file mode 100644 index 240f58e3..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json deleted file mode 100644 index 51fc6366..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "vms": [ - "141", - "379", - "851", - "116" - ], - "minServerLoad": 0.0, - "performanceScore": 0.8830158730158756 - }, - { - "vms": [ - "205", - "116", - "463" - ], - "minServerLoad": 0.0, - "performanceScore": 0.7133055555552751 - } -] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet deleted file mode 100644 index 8e9dcea7..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet deleted file mode 100644 index 94a2d69e..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet deleted file mode 100644 index 2a7da2eb..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet and /dev/null differ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java index ece90c20..48177412 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java @@ -40,7 +40,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private double totalDemand; // The total demand of all the consumers private double totalSupply; // The total supply from the supplier - private boolean overProvisioned = false; + private boolean overLoaded = false; private int currentConsumerIdx = -1; private double capacity; // What is the max capacity @@ -68,11 +68,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private void distributeSupply() { // if supply >= demand -> push supplies to all tasks - // TODO: possible optimization -> Only has to be done for the specific consumer that changed demand - if (this.totalSupply >= this.totalDemand) { + if (this.totalSupply > this.totalDemand) { // If this came from a state of over provisioning, provide all consumers with their demand - if (this.overProvisioned) { + if (this.overLoaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); } @@ -84,12 +83,12 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.currentConsumerIdx = -1; } - this.overProvisioned = false; + this.overLoaded = false; } // if supply < demand -> distribute the supply over all consumers else { - this.overProvisioned = true; + this.overLoaded = true; double[] supplies = redistributeSupply(this.demands, this.totalSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { @@ -178,6 +177,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.currentConsumerIdx = -1; + if (this.overLoaded) { + this.distributeSupply(); + } + this.pushDemand(this.supplierEdge, this.totalDemand); } @@ -205,7 +208,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer demands.set(idx, newDemand); this.totalDemand += (newDemand - prevDemand); - if (overProvisioned) { + if (overLoaded) { distributeSupply(); } @@ -216,7 +219,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer @Override public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers + this.totalSupply = newSupply; this.distributeSupply(); } -- cgit v1.2.3