summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-base/src/test/kotlin
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-10 14:34:50 +0100
committerGitHub <noreply@github.com>2025-01-10 14:34:50 +0100
commit192ca10f332c58d38d129709c64dd69c284683f5 (patch)
tree489d182bdc114631e37a5bb8306fceb529f374c2 /opendc-experiments/opendc-experiments-base/src/test/kotlin
parentf71e07f55a5176c5bd5447cdb3bcfebf2f5f47ee (diff)
Restructured Testing files (#287)
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/test/kotlin')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt616
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt419
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt272
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt206
4 files changed, 969 insertions, 544 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
index 845a8bae..e37bcd4a 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt
@@ -23,389 +23,311 @@
package org.opendc.experiments.base
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.simulator.provisioner.Provisioner
-import org.opendc.compute.simulator.provisioner.registerComputeMonitor
-import org.opendc.compute.simulator.provisioner.setupComputeService
-import org.opendc.compute.simulator.provisioner.setupHosts
-import org.opendc.compute.simulator.scheduler.FilterScheduler
-import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
-import org.opendc.compute.simulator.scheduler.filters.RamFilter
-import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
-import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.service.ComputeService
-import org.opendc.compute.simulator.telemetry.ComputeMonitor
-import org.opendc.compute.simulator.telemetry.table.HostTableReader
-import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
-import org.opendc.compute.topology.clusterTopology
-import org.opendc.compute.topology.specs.ClusterSpec
-import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.Task
-import org.opendc.compute.workload.sampleByLoad
-import org.opendc.compute.workload.trace
-import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec
-import org.opendc.experiments.base.runner.replay
-import org.opendc.simulator.kotlin.runSimulation
-import java.io.File
-import java.util.Random
+import org.opendc.simulator.compute.workload.TraceFragment
+import java.util.ArrayList
/**
* An integration test suite for the Scenario experiments.
*/
class ExperimentTest {
/**
- * The monitor used to keep track of the metrics.
- */
- private lateinit var monitor: TestComputeMonitor
-
- /**
- * The [FilterScheduler] to use for all experiments.
- */
- private lateinit var computeScheduler: FilterScheduler
-
- /**
- * The [ComputeWorkloadLoader] responsible for loading the traces.
- */
- private lateinit var workloadLoader: ComputeWorkloadLoader
-
- private val basePath = "src/test/resources/Experiment"
-
- /**
- * Set up the experimental environment.
- */
- @BeforeEach
- fun setUp() {
- monitor = TestComputeMonitor()
- computeScheduler =
- FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
- )
- workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0)
- }
-
- /**
- * Test a small simulation setup.
+ * Simulator test 1: Single Task
+ * In this test, a single task is scheduled that takes 10 minutes to run.
+ *
+ * There should be no problems running the task, so the total runtime should be 10 min.
+ *
+ * The task is using 50% of the available CPU capacity.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
*/
@Test
- fun testSingleTask() =
- runSimulation {
- val seed = 1L
- val workload = createTestWorkload("single_task", 1.0, seed)
- val topology = createTopology("single.json")
- val monitor = monitor
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
-
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed)
- }
-
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ fun testSimulator1() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
)
- }
-
- /**
- * Test a small simulation setup.
- */
- @Test
- fun testSingleTaskSingleFailure() =
- runSimulation {
- val seed = 1L
- val workload = createTestWorkload("single_task", 1.0, seed)
- val topology = createTopology("single.json")
- val monitor = monitor
- val failureModelSpec =
- TraceBasedFailureModelSpec(
- "$basePath/failureTraces/single_failure.parquet",
- repeat = false,
- )
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
+ val topology = createTopology("single_1_2000.json")
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.setTasksExpected(workload.size)
+ val monitor = runTest(topology, workload)
- service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
- }
-
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
- )
- }
+ assertAll(
+ { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals((10 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(600 * 150.0, monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
/**
- * Test a small simulation setup.
+ * Simulator test 1: Two Tasks
+ * In this test, two tasks are scheduled.
+ *
+ * There should be no problems running the task, so the total runtime should be 15 min.
+ *
+ * The first task is using 50% of the available CPU capacity.
+ * The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testSingleTask11Failures() =
- runSimulation {
- val seed = 1L
- val workload = createTestWorkload("single_task", 1.0, seed)
- val topology = createTopology("single.json")
- val monitor = monitor
- val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet")
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
-
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
- }
-
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } },
- { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ fun testSimulator2() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(5 * 60 * 1000, 2000.0, 1),
+ ),
+ ),
)
- }
-
- /**
- * Test a small simulation setup.
- */
- @Test
- fun testSingleTaskCheckpoint() =
- runSimulation {
- val seed = 1L
- workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 1000000L, 1000L, 1.0)
- val workload = createTestWorkload("single_task", 1.0, seed)
- val topology = createTopology("single.json")
- val monitor = monitor
- val failureModelSpec = TraceBasedFailureModelSpec("$basePath/failureTraces/11_failures.parquet")
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed)
- }
+ val topology = createTopology("single_1_2000.json")
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
+ val monitor = runTest(topology, workload)
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } },
- { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } },
- { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
- )
- }
+ assertAll(
+ { assertEquals(15 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals(((10 * 30000) + (5 * 60000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals((600 * 150.0) + (300 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
/**
- * Test a small simulation setup.
+ * Simulator test 3: Two Tasks, one scheduled later
+ * In this test, two tasks are scheduled.
+ *
+ * There should be no problems running the task, so the total runtime should be 15 min.
+ *
+ * The first task is using 50% of the available CPU capacity.
+ * The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testSmall() =
- runSimulation {
- val seed = 1L
- val workload = createTestWorkload("bitbrains-small", 0.25, seed)
- val topology = createTopology("single.json")
- val monitor = monitor
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
+ fun testSimulator3() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ )
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed)
- }
+ val topology = createTopology("single_2_2000.json")
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
+ val monitor = runTest(topology, workload)
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
- )
- }
+ assertAll(
+ { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals((600 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ { assertEquals((600 * 150.0), monitor.energyUsages.sum()) { "Incorrect energy usage" } },
+ )
+ }
/**
- * Test a large simulation setup.
+ * Simulator test 4: Two Tasks, one scheduled later
+ * In this test, two tasks are scheduled.
+ *
+ * There should be no problems running the task, so the total runtime should be 15 min.
+ *
+ * The first task is using 50% of the available CPU capacity.
+ * The second task is using 100% of the available CPU capacity.
*/
@Test
- fun testLarge() =
- runSimulation {
- val seed = 0L
- val workload = createTestWorkload("bitbrains-small", 1.0, seed)
- val topology = createTopology("multi.json")
- val monitor = monitor
-
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
-
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload, seed = seed)
- }
-
- println(
- "Scheduler " +
- "Success=${monitor.attemptsSuccess} " +
- "Failure=${monitor.attemptsFailure} " +
- "Error=${monitor.attemptsError} " +
- "Pending=${monitor.tasksPending} " +
- "Active=${monitor.tasksActive}",
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") },
- { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
- { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
- { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
- { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(6.914184592181973E9, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
+ fun testSimulator4() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ createTestTask(
+ name = "1",
+ fragments =
+ arrayListOf(
+ TraceFragment(5 * 60 * 1000, 2000.0, 1),
+ ),
+ submissionTime = "1970-01-01T00:20",
+ ),
)
- }
- /**
- * Obtain the trace reader for the test.
- */
- private fun createTestWorkload(
- traceName: String,
- fraction: Double,
- seed: Long,
- ): List<Task> {
- val source = trace(traceName).sampleByLoad(fraction)
- return source.resolve(workloadLoader, Random(seed))
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(25 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000) + (10 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals(((10 * 30000) + (5 * 60000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ {
+ assertEquals(
+ (600 * 150.0) + (600 * 100.0) + (300 * 200.0),
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect energy usage" }
+ },
+ )
}
- /**
- * Obtain the topology factory for the test.
- */
- private fun createTopology(name: String): List<ClusterSpec> {
- val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/Experiment/topologies/$name"))
- return stream.use { clusterTopology(stream) }
- }
-
- class TestComputeMonitor : ComputeMonitor {
- var attemptsSuccess = 0
- var attemptsFailure = 0
- var attemptsError = 0
- var tasksPending = 0
- var tasksActive = 0
- var tasksTerminated = 0
- var tasksCompleted = 0
-
- override fun record(reader: ServiceTableReader) {
- attemptsSuccess = reader.attemptsSuccess
- attemptsFailure = reader.attemptsFailure
- attemptsError = 0
- tasksPending = reader.tasksPending
- tasksActive = reader.tasksActive
- tasksTerminated = reader.tasksTerminated
- tasksCompleted = reader.tasksCompleted
- }
-
- var idleTime = 0L
- var activeTime = 0L
- var stealTime = 0L
- var lostTime = 0L
- var powerDraw = 0.0
- var energyUsage = 0.0
- var uptime = 0L
-
- override fun record(reader: HostTableReader) {
- idleTime += reader.cpuIdleTime
- activeTime += reader.cpuActiveTime
- stealTime += reader.cpuStealTime
- lostTime += reader.cpuLostTime
- powerDraw += reader.powerDraw
- energyUsage += reader.energyUsage
- uptime += reader.uptime
- }
- }
+// /**
+// * Test a small simulation setup.
+// */
+// @Test
+// fun testSingleTask() =
+// runSimulation {
+// val seed = 1L
+// val workload = createTestWorkload("single_task", 1.0, seed)
+// val topology = createTopology("single.json")
+// val monitor = monitor
+//
+// Provisioner(dispatcher, seed).use { provisioner ->
+// provisioner.runSteps(
+// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
+// setupHosts(serviceDomain = "compute.opendc.org", topology),
+// )
+//
+// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+// service.replay(timeSource, workload, seed = seed)
+// }
+//
+// println(
+// "Scheduler " +
+// "Success=${monitor.attemptsSuccess} " +
+// "Failure=${monitor.attemptsFailure} " +
+// "Error=${monitor.attemptsError} " +
+// "Pending=${monitor.tasksPending} " +
+// "Active=${monitor.tasksActive}",
+// )
+//
+// // Note that these values have been verified beforehand
+// assertAll(
+// { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } },
+// { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } },
+// { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+// { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+// { assertEquals(1200000.0, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } },
+// )
+// }
+//
+// /**
+// * Test a small simulation setup.
+// */
+// @Test
+// fun testSmall() =
+// runSimulation {
+// val seed = 1L
+// val workload = createTestWorkload("bitbrains-small", 0.25, seed)
+// val topology = createTopology("single.json")
+// val monitor = monitor
+//
+// Provisioner(dispatcher, seed).use { provisioner ->
+// provisioner.runSteps(
+// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
+// setupHosts(serviceDomain = "compute.opendc.org", topology),
+// )
+//
+// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+// service.replay(timeSource, workload, seed = seed)
+// }
+//
+// println(
+// "Scheduler " +
+// "Success=${monitor.attemptsSuccess} " +
+// "Failure=${monitor.attemptsFailure} " +
+// "Error=${monitor.attemptsError} " +
+// "Pending=${monitor.tasksPending} " +
+// "Active=${monitor.tasksActive}",
+// )
+//
+// // Note that these values have been verified beforehand
+// assertAll(
+// { assertEquals(1803918435, monitor.idleTime) { "Idle time incorrect" } },
+// { assertEquals(787181565, monitor.activeTime) { "Active time incorrect" } },
+// { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+// { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+// { assertEquals(6.7565629E8, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } },
+// )
+// }
+//
+// /**
+// * Test a large simulation setup.
+// */
+// @Test
+// fun testLarge() =
+// runSimulation {
+// val seed = 0L
+// val workload = createTestWorkload("bitbrains-small", 1.0, seed)
+// val topology = createTopology("multi.json")
+// val monitor = monitor
+//
+// Provisioner(dispatcher, seed).use { provisioner ->
+// provisioner.runSteps(
+// setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+// registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
+// setupHosts(serviceDomain = "compute.opendc.org", topology),
+// )
+//
+// val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+// service.replay(timeSource, workload, seed = seed)
+// }
+//
+// println(
+// "Scheduler " +
+// "Success=${monitor.attemptsSuccess} " +
+// "Failure=${monitor.attemptsFailure} " +
+// "Error=${monitor.attemptsError} " +
+// "Pending=${monitor.tasksPending} " +
+// "Active=${monitor.tasksActive}",
+// )
+//
+// // Note that these values have been verified beforehand
+// assertAll(
+// { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+// { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") },
+// { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") },
+// { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
+// { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
+// { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
+// { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
+// { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
+// { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
+// { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
+// { assertEquals(6.914184592181973E9, monitor.hostEnergyUsage.sum(), 1E4) { "Incorrect energy usage" } },
+// )
+// }
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt
new file mode 100644
index 00000000..90737ab6
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt
@@ -0,0 +1,419 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.base
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.workload.Task
+import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec
+import org.opendc.simulator.compute.workload.TraceFragment
+import java.util.ArrayList
+
+/**
+ * An integration test suite for the Scenario experiments.
+ */
+class FailuresAndCheckpointingTest {
+ /**
+ * Failure test 1: Single Task, Single Failure
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ * This means the final runtime is 20 minutes
+ *
+ * When the task is running, it is using 50% of the cpu.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
+ */
+ @Test
+ fun testFailures1() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/single_failure.parquet",
+ repeat = false,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals(20 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((15 * 30000) + (5 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals((15 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
+ { assertEquals((15 * 60 * 150.0) + (5 * 60 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Failure test 2: Single Task, Failure much later
+ * In this test, a single task is scheduled, with a failure trace.
+ *
+ * However, the first failure occurs after 500 min and should thus not affect the Task.
+ */
+ @Test
+ fun testFailures2() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/single_failure_2.parquet",
+ repeat = false,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals((10 * 30000).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals((10 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals((600 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Failure test 3: Single Task, Single Failure
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ * This means the final runtime is 20 minutes
+ *
+ * When the task is running, it is using 50% of the cpu.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
+ */
+ @Test
+ fun testFailures3() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/two_failures.parquet",
+ repeat = false,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals(37 * 60 * 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((22 * 30000) + (15 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals((22 * 30000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
+ { assertEquals((22 * 60 * 150.0) + (15 * 60 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Failure test 4: Single Task, repeated failure
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ * This means the final runtime is 20 minutes
+ *
+ * When the task is running, it is using 50% of the cpu.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
+ */
+ @Test
+ fun testFailures4() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/single_failure.parquet",
+ repeat = true,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals(95 * 60000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((50 * 60000) + (20 * 60000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals((25 * 60000).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(15)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(20)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(25)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(30)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(35)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(40)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(45)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(50)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(55)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(60)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(65)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(70)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(75)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(80)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(85)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(90)) { "Incorrect energy usage" } },
+ { assertEquals(0.0, monitor.hostEnergyUsages["H01"]?.get(95)) { "Incorrect energy usage" } },
+ { assertEquals((10 * 300 * 150.0) + (9 * 300 * 100.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Failure test 1: Single Task with checkpointing
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ * This means the final runtime is 20 minutes
+ *
+ * When the task is running, it is using 50% of the cpu.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
+ */
+ @Test
+ fun testCheckpoints1() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ ),
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals((10 * 60000) + (9 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals(((10 * 30000) + (9 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals((10 * 60 * 150.0) + (9 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Failure test 2: Single Task with scaling checkpointing
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ * This means the final runtime is 20 minutes
+ *
+ * When the task is running, it is using 50% of the cpu.
+ * This means that half of the time is active, and half is idle.
+ * When the task is failed, all time is idle.
+ */
+ @Test
+ fun testCheckpoints2() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ checkpointIntervalScaling = 1.5,
+ ),
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals((10 * 60000) + (4 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } },
+ { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } },
+ { assertEquals(((10 * 30000) + (4 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } },
+ { assertEquals((10 * 60 * 150.0) + (4 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } },
+ )
+ }
+
+ /**
+ * Checkpoint test 3: Single Task, single failure with checkpointing
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ */
+ @Test
+ fun testCheckpoints3() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/single_failure.parquet",
+ repeat = false,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals((960 * 1000) + 5000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((300 * 1000) + (296 * 500) + (360 * 500)).toLong(),
+ monitor.hostIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((296 * 500) + 4000 + (360 * 500) + 5000).toLong(),
+ monitor.hostActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
+ {
+ assertEquals(
+ (296 * 150.0) + (4 * 200.0) + (300 * 100.0) +
+ (360 * 150.0) + (5 * 200.0),
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect energy usage" }
+ },
+ )
+ }
+
+ /**
+ * Checkpoint test 4: Single Task, repeated failure with checkpointing
+ * In this test, a single task is scheduled that is interrupted by a failure after 5 min.
+ * Because there is no checkpointing, the full task has to be rerun.
+ *
+ */
+ @Test
+ fun testCheckpoints4() {
+ val workload: ArrayList<Task> =
+ arrayListOf(
+ createTestTask(
+ name = "0",
+ fragments =
+ arrayListOf(
+ TraceFragment(10 * 60 * 1000, 1000.0, 1),
+ ),
+ checkpointInterval = 60 * 1000L,
+ checkpointDuration = 1000L,
+ ),
+ )
+
+ val failureModelSpec =
+ TraceBasedFailureModelSpec(
+ "src/test/resources/failureTraces/single_failure.parquet",
+ repeat = true,
+ )
+
+ val topology = createTopology("single_1_2000.json")
+
+ val monitor = runTest(topology, workload, failureModelSpec)
+
+ assertAll(
+ { assertEquals((22 * 60000) + 1000, monitor.maxTimestamp) { "Total runtime incorrect" } },
+ {
+ assertEquals(
+ ((10 * 60000) + (2 * 296 * 500) + (120 * 500)).toLong(),
+ monitor.hostIdleTimes["H01"]?.sum(),
+ ) { "Idle time incorrect" }
+ },
+ {
+ assertEquals(
+ ((2 * 296 * 500) + 8000 + (120 * 500) + 1000).toLong(),
+ monitor.hostActiveTimes["H01"]?.sum(),
+ ) { "Active time incorrect" }
+ },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } },
+ { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } },
+ { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } },
+ {
+ assertEquals(
+ (2 * 296 * 150.0) + (8 * 200.0) + (600 * 100.0) +
+ (120 * 150.0) + (200.0),
+ monitor.hostEnergyUsages["H01"]?.sum(),
+ ) { "Incorrect energy usage" }
+ },
+ )
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
index 2cd464a1..4a7c9341 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt
@@ -23,118 +23,17 @@
package org.opendc.experiments.base
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.simulator.provisioner.Provisioner
-import org.opendc.compute.simulator.provisioner.registerComputeMonitor
-import org.opendc.compute.simulator.provisioner.setupComputeService
-import org.opendc.compute.simulator.provisioner.setupHosts
-import org.opendc.compute.simulator.scheduler.FilterScheduler
-import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
-import org.opendc.compute.simulator.scheduler.filters.RamFilter
-import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
-import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.service.ComputeService
-import org.opendc.compute.simulator.telemetry.ComputeMonitor
-import org.opendc.compute.simulator.telemetry.table.HostTableReader
-import org.opendc.compute.simulator.telemetry.table.TaskTableReader
-import org.opendc.compute.topology.clusterTopology
-import org.opendc.compute.topology.specs.ClusterSpec
-import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.Task
-import org.opendc.experiments.base.runner.replay
import org.opendc.simulator.compute.workload.TraceFragment
-import org.opendc.simulator.compute.workload.TraceWorkload
-import org.opendc.simulator.kotlin.runSimulation
-import java.io.File
-import java.time.Duration
-import java.time.LocalDateTime
-import java.time.ZoneId
import java.util.ArrayList
-import java.util.UUID
/**
* Testing suite containing tests that specifically test the FlowDistributor
*/
class FlowDistributorTest {
/**
- * The monitor used to keep track of the metrics.
- */
- private lateinit var monitor: TestComputeMonitor
-
- /**
- * The [FilterScheduler] to use for all experiments.
- */
- private lateinit var computeScheduler: FilterScheduler
-
- /**
- * The [ComputeWorkloadLoader] responsible for loading the traces.
- */
- private lateinit var workloadLoader: ComputeWorkloadLoader
-
- private val basePath = "src/test/resources/FlowDistributor"
-
- /**
- * Set up the experimental environment.
- */
- @BeforeEach
- fun setUp() {
- monitor = TestComputeMonitor()
- computeScheduler =
- FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
- )
- workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0)
- }
-
- private fun createTestTask(
- name: String,
- cpuCount: Int = 1,
- cpuCapacity: Double = 0.0,
- memCapacity: Long = 0L,
- submissionTime: String = "1970-01-01T00:00",
- duration: Long = 0L,
- fragments: ArrayList<TraceFragment>,
- ): Task {
- return Task(
- UUID.nameUUIDFromBytes(name.toByteArray()),
- name,
- cpuCount,
- cpuCapacity,
- memCapacity,
- 1800000.0,
- LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(),
- duration,
- TraceWorkload(
- fragments,
- ),
- )
- }
-
- private fun runTest(
- topology: List<ClusterSpec>,
- workload: ArrayList<Task>,
- ): TestComputeMonitor {
- runSimulation {
- val monitor = monitor
- val seed = 0L
- Provisioner(dispatcher, seed).use { provisioner ->
- provisioner.runSteps(
- setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
- registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)),
- setupHosts(serviceDomain = "compute.opendc.org", topology),
- )
-
- val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
- service.replay(timeSource, workload)
- }
- }
- return monitor
- }
-
- /**
* FlowDistributor test 1: A single fitting task
* In this test, a single task is scheduled that should fit the FlowDistributor
* We check if both the host and the Task show the correct cpu usage and demand during the two fragments.
@@ -154,17 +53,17 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -189,17 +88,17 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -224,17 +123,17 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -259,17 +158,17 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(1000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -294,17 +193,17 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(4000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuDemands["0"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["0"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -337,7 +236,7 @@ class FlowDistributorTest {
)
val topology = createTopology("single_2_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
@@ -348,10 +247,10 @@ class FlowDistributorTest {
{ assertEquals(1000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(1000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -384,7 +283,7 @@ class FlowDistributorTest {
)
val topology = createTopology("single_2_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(6000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
@@ -395,10 +294,10 @@ class FlowDistributorTest {
{ assertEquals(6000.0, monitor.taskCpuDemands["1"]?.get(10)) { "The cpu demanded by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 0 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(10)) { "The cpu used by task 0 is incorrect" } },
- { assertEquals(11000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(11000.0, monitor.hostCpuDemands[10]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[10]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(11000.0, monitor.hostCpuDemands["H01"]?.get(10)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(10)) { "The cpu used by the host is incorrect" } },
)
}
@@ -431,7 +330,7 @@ class FlowDistributorTest {
)
val topology = createTopology("single_2_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
@@ -446,14 +345,14 @@ class FlowDistributorTest {
{ assertEquals(2000.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(1000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
- { assertEquals(2000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(1000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(2000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } },
)
}
@@ -488,7 +387,7 @@ class FlowDistributorTest {
)
val topology = createTopology("single_2_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(3000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
@@ -503,12 +402,12 @@ class FlowDistributorTest {
{ assertEquals(1500.0, monitor.taskCpuDemands["1"]?.get(6)) { "The cpu demanded by task 1 is incorrect" } },
{ assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(1)) { "The cpu used by task 1 is incorrect" } },
{ assertEquals(1500.0, monitor.taskCpuSupplied["1"]?.get(6)) { "The cpu used by task 1 is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
- { assertEquals(3000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(3000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } },
)
}
@@ -543,7 +442,7 @@ class FlowDistributorTest {
)
val topology = createTopology("single_2_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
assertAll(
{ assertEquals(1000.0, monitor.taskCpuDemands["0"]?.get(1)) { "The cpu demanded by task 0 is incorrect" } },
@@ -562,14 +461,14 @@ class FlowDistributorTest {
{ assertEquals(2500.0, monitor.taskCpuSupplied["1"]?.get(5)) { "The cpu used by task 1 is incorrect" } },
{ assertEquals(2000.0, monitor.taskCpuSupplied["1"]?.get(9)) { "The cpu used by task 1 is incorrect" } },
{ assertEquals(3000.0, monitor.taskCpuSupplied["1"]?.get(14)) { "The cpu used by task 1 is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[1]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4500.0, monitor.hostCpuDemands[5]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(5500.0, monitor.hostCpuDemands[9]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuDemands[14]) { "The cpu demanded by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[1]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[5]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[9]) { "The cpu used by the host is incorrect" } },
- { assertEquals(4000.0, monitor.hostCpuSupplied[14]) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(1)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4500.0, monitor.hostCpuDemands["H01"]?.get(5)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(5500.0, monitor.hostCpuDemands["H01"]?.get(9)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuDemands["H01"]?.get(14)) { "The cpu demanded by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(1)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(5)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(9)) { "The cpu used by the host is incorrect" } },
+ { assertEquals(4000.0, monitor.hostCpuSupplied["H01"]?.get(14)) { "The cpu used by the host is incorrect" } },
)
}
@@ -588,13 +487,17 @@ class FlowDistributorTest {
name = "0",
fragments =
arrayListOf<TraceFragment>().apply {
- repeat(10) { this.add(TraceFragment(20 * 60 * 1000, 3000.0, 1)) }
+ repeat(1) { this.add(TraceFragment(10 * 60 * 1000, 3000.0, 1)) }
},
),
)
val topology = createTopology("single_5000_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } },
+ )
}
/**
@@ -618,7 +521,11 @@ class FlowDistributorTest {
)
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
+ val monitor = runTest(topology, workload)
+
+ assertAll(
+ { assertEquals(1000 * 10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } },
+ )
}
/**
@@ -644,39 +551,10 @@ class FlowDistributorTest {
}
val topology = createTopology("single_1_2000.json")
- monitor = runTest(topology, workload)
- }
+ val monitor = runTest(topology, workload)
- /**
- * Obtain the topology factory for the test.
- */
- private fun createTopology(name: String): List<ClusterSpec> {
- val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/FlowDistributor/topologies/$name"))
- return stream.use { clusterTopology(stream) }
- }
-
- class TestComputeMonitor : ComputeMonitor {
- var hostCpuDemands = ArrayList<Double>()
- var hostCpuSupplied = ArrayList<Double>()
-
- override fun record(reader: HostTableReader) {
- hostCpuDemands.add(reader.cpuDemand)
- hostCpuSupplied.add(reader.cpuUsage)
- }
-
- var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>()
- var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>()
-
- override fun record(reader: TaskTableReader) {
- val taskName: String = reader.taskInfo.name
-
- if (taskName in taskCpuDemands) {
- taskCpuDemands[taskName]?.add(reader.cpuDemand)
- taskCpuSupplied[taskName]?.add(reader.cpuUsage)
- } else {
- taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand)
- taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage)
- }
- }
+ assertAll(
+ { assertEquals(1000 * 10 * 60 * 1000, monitor.maxTimestamp) { "The expected runtime is exceeded" } },
+ )
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
new file mode 100644
index 00000000..e5613e50
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.base
+
+import org.opendc.compute.simulator.provisioner.Provisioner
+import org.opendc.compute.simulator.provisioner.registerComputeMonitor
+import org.opendc.compute.simulator.provisioner.setupComputeService
+import org.opendc.compute.simulator.provisioner.setupHosts
+import org.opendc.compute.simulator.scheduler.FilterScheduler
+import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
+import org.opendc.compute.simulator.scheduler.filters.RamFilter
+import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
+import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.compute.simulator.telemetry.ComputeMonitor
+import org.opendc.compute.simulator.telemetry.table.HostTableReader
+import org.opendc.compute.simulator.telemetry.table.PowerSourceTableReader
+import org.opendc.compute.simulator.telemetry.table.ServiceTableReader
+import org.opendc.compute.simulator.telemetry.table.TaskTableReader
+import org.opendc.compute.topology.clusterTopology
+import org.opendc.compute.topology.specs.ClusterSpec
+import org.opendc.compute.workload.Task
+import org.opendc.experiments.base.experiment.specs.FailureModelSpec
+import org.opendc.experiments.base.runner.replay
+import org.opendc.simulator.compute.workload.TraceFragment
+import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.kotlin.runSimulation
+import java.time.Duration
+import java.time.LocalDateTime
+import java.time.ZoneId
+import java.util.UUID
+import kotlin.collections.ArrayList
+
+/**
+ * Obtain the topology factory for the test.
+ */
+fun createTopology(name: String): List<ClusterSpec> {
+ val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name"))
+ return stream.use { clusterTopology(stream) }
+}
+
+fun createTestTask(
+ name: String,
+ memCapacity: Long = 0L,
+ submissionTime: String = "1970-01-01T00:00",
+ duration: Long = 0L,
+ fragments: ArrayList<TraceFragment>,
+ checkpointInterval: Long = 0L,
+ checkpointDuration: Long = 0L,
+ checkpointIntervalScaling: Double = 1.0,
+): Task {
+ return Task(
+ UUID.nameUUIDFromBytes(name.toByteArray()),
+ name,
+ fragments.maxOf { it.coreCount },
+ fragments.maxOf { it.cpuUsage },
+ memCapacity,
+ 1800000.0,
+ LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(),
+ duration,
+ TraceWorkload(
+ fragments,
+ checkpointInterval,
+ checkpointDuration,
+ checkpointIntervalScaling,
+ ),
+ )
+}
+
+fun runTest(
+ topology: List<ClusterSpec>,
+ workload: ArrayList<Task>,
+ failureModelSpec: FailureModelSpec? = null,
+): TestComputeMonitor {
+ val monitor = TestComputeMonitor()
+
+ val computeScheduler =
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0)),
+ )
+
+ runSimulation {
+ val seed = 0L
+ Provisioner(dispatcher, seed).use { provisioner ->
+ provisioner.runSteps(
+ setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
+ registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)),
+ setupHosts(serviceDomain = "compute.opendc.org", topology),
+ )
+
+ val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!
+ service.setTasksExpected(workload.size)
+ service.setMetricReader(provisioner.getMonitor())
+
+ service.replay(timeSource, workload, failureModelSpec = failureModelSpec)
+ }
+ }
+ return monitor
+}
+
+class TestComputeMonitor : ComputeMonitor {
+ var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>()
+ var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>()
+
+ override fun record(reader: TaskTableReader) {
+ val taskName: String = reader.taskInfo.name
+
+ if (taskName in taskCpuDemands) {
+ taskCpuDemands[taskName]?.add(reader.cpuDemand)
+ taskCpuSupplied[taskName]?.add(reader.cpuUsage)
+ } else {
+ taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand)
+ taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage)
+ }
+ }
+
+ var attemptsSuccess = 0
+ var attemptsFailure = 0
+ var attemptsError = 0
+ var tasksPending = 0
+ var tasksActive = 0
+ var tasksTerminated = 0
+ var tasksCompleted = 0
+
+ var timestamps = ArrayList<Long>()
+
+ var maxTimestamp = 0L
+
+ override fun record(reader: ServiceTableReader) {
+ attemptsSuccess = reader.attemptsSuccess
+ attemptsFailure = reader.attemptsFailure
+ attemptsError = 0
+ tasksPending = reader.tasksPending
+ tasksActive = reader.tasksActive
+ tasksTerminated = reader.tasksTerminated
+ tasksCompleted = reader.tasksCompleted
+
+ timestamps.add(reader.timestamp.toEpochMilli())
+ maxTimestamp = reader.timestamp.toEpochMilli()
+ }
+
+ var hostIdleTimes = mutableMapOf<String, ArrayList<Long>>()
+ var hostActiveTimes = mutableMapOf<String, ArrayList<Long>>()
+ var hostStealTimes = mutableMapOf<String, ArrayList<Long>>()
+ var hostLostTimes = mutableMapOf<String, ArrayList<Long>>()
+
+ var hostCpuDemands = mutableMapOf<String, ArrayList<Double>>()
+ var hostCpuSupplied = mutableMapOf<String, ArrayList<Double>>()
+ var hostPowerDraws = mutableMapOf<String, ArrayList<Double>>()
+ var hostEnergyUsages = mutableMapOf<String, ArrayList<Double>>()
+
+ override fun record(reader: HostTableReader) {
+ val hostName: String = reader.host.name
+
+ if (!(hostName in hostCpuDemands)) {
+ hostIdleTimes[hostName] = ArrayList()
+ hostActiveTimes[hostName] = ArrayList()
+ hostStealTimes[hostName] = ArrayList()
+ hostLostTimes[hostName] = ArrayList()
+
+ hostCpuDemands[hostName] = ArrayList()
+ hostCpuSupplied[hostName] = ArrayList()
+ hostPowerDraws[hostName] = ArrayList()
+ hostEnergyUsages[hostName] = ArrayList()
+ }
+
+ hostIdleTimes[hostName]?.add(reader.cpuIdleTime)
+ hostActiveTimes[hostName]?.add(reader.cpuActiveTime)
+ hostStealTimes[hostName]?.add(reader.cpuStealTime)
+ hostLostTimes[hostName]?.add(reader.cpuLostTime)
+
+ hostCpuDemands[hostName]?.add(reader.cpuDemand)
+ hostCpuSupplied[hostName]?.add(reader.cpuUsage)
+ hostPowerDraws[hostName]?.add(reader.powerDraw)
+ hostEnergyUsages[hostName]?.add(reader.energyUsage)
+ }
+
+ var powerDraws = ArrayList<Double>()
+ var energyUsages = ArrayList<Double>()
+
+ override fun record(reader: PowerSourceTableReader) {
+ powerDraws.add(reader.powerDraw)
+ energyUsages.add(reader.energyUsage)
+ }
+}