From 5a365dbc068f2a8cdfa9813c39cc84bb30e15637 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 25 Oct 2024 13:32:41 +0200 Subject: Rewrote the FlowEngine (#256) * Removed unused components. Updated tests. Improved checkpointing model Improved model, started with SimPowerSource implemented FailureModels and Checkpointing First working version midway commit first update All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. * fixed merge conflicts * Updated M3SA paths. * Fixed small typo --- .../opendc-experiments-base/build.gradle.kts | 2 - .../experiments/base/runner/ScenarioReplayer.kt | 39 ++-- .../experiments/base/runner/ScenarioRunner.kt | 23 ++- .../experiments/base/scenario/ExperimentReader.kt | 2 +- .../opendc/experiments/base/scenario/Scenario.kt | 2 +- .../base/scenario/specs/AllocationPolicySpec.kt | 2 +- .../base/scenario/specs/ExperimentSpec.kt | 2 +- .../base/scenario/specs/FailureModelSpec.kt | 2 +- .../base/scenario/specs/ScenarioSpec.kt | 2 +- .../experiments/base/ScenarioIntegrationTest.kt | 218 +++++++++++++++------ .../src/test/resources/env/multi.json | 59 ------ .../src/test/resources/env/single.json | 23 --- .../resources/failureTraces/11_failures.parquet | Bin 0 -> 2786 bytes .../resources/failureTraces/single_failure.parquet | Bin 0 -> 2786 bytes .../src/test/resources/topologies/multi.json | 59 ++++++ .../src/test/resources/topologies/single.json | 23 +++ .../trace/bitbrains-small/fragments.parquet | Bin 717069 -> 0 bytes .../trace/bitbrains-small/interference-model.json | 21 -- .../resources/trace/bitbrains-small/tasks.parquet | Bin 5525 -> 0 bytes .../traces/bitbrains-small/fragments.parquet | Bin 0 -> 717069 bytes .../traces/bitbrains-small/interference-model.json | 21 ++ .../resources/traces/bitbrains-small/tasks.parquet | Bin 0 -> 5525 bytes .../resources/traces/single_task/fragments.parquet | Bin 0 -> 3012 bytes .../resources/traces/single_task/tasks.parquet | Bin 0 -> 4471 bytes 24 files changed, 302 insertions(+), 198 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json delete mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet create mode 100644 opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet (limited to 'opendc-experiments/opendc-experiments-base') diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index 98ec6723..d8921ffb 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -33,7 +33,6 @@ plugins { dependencies { - api(projects.opendcCompute.opendcComputeService) api(projects.opendcCompute.opendcComputeSimulator) implementation(libs.clikt) @@ -42,7 +41,6 @@ dependencies { implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-core"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-workload"))) - implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-telemetry"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-topology"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon"))) implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-failure"))) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index 49fa409e..c82e2557 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -29,13 +29,12 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.yield -import org.opendc.compute.api.Task import org.opendc.compute.api.TaskState -import org.opendc.compute.api.TaskWatcher import org.opendc.compute.failure.models.FailureModel -import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.VirtualMachine -import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec +import org.opendc.compute.simulator.TaskWatcher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.workload.Task import org.opendc.experiments.base.scenario.specs.FailureModelSpec import org.opendc.experiments.base.scenario.specs.createFailureModel import java.time.InstantSource @@ -50,7 +49,7 @@ import kotlin.math.max */ public class RunningTaskWatcher : TaskWatcher { // TODO: make this changeable - private val unlockStates: List = listOf(TaskState.DELETED, TaskState.TERMINATED) + private val unlockStates: List = listOf(TaskState.DELETED) private val mutex: Mutex = Mutex() @@ -63,7 +62,7 @@ public class RunningTaskWatcher : TaskWatcher { } override fun onStateChanged( - task: Task, + task: ServiceTask, newState: TaskState, ) { if (unlockStates.contains(newState)) { @@ -73,7 +72,7 @@ public class RunningTaskWatcher : TaskWatcher { } /** - * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * Helper method to replay the specified list of [Task] and suspend execution util all VMs have finished. * * @param clock The simulation clock. * @param trace The trace to simulate. @@ -83,9 +82,8 @@ public class RunningTaskWatcher : TaskWatcher { */ public suspend fun ComputeService.replay( clock: InstantSource, - trace: List, + trace: List, failureModelSpec: FailureModelSpec? = null, - checkpointModelSpec: CheckpointModelSpec? = null, seed: Long = 0, submitImmediately: Boolean = false, ) { @@ -97,9 +95,6 @@ public suspend fun ComputeService.replay( createFailureModel(coroutineContext, clock, this, Random(seed), it) } - // Create new image for the virtual machine - val image = client.newImage("vm-image") - try { coroutineScope { // Start the fault injector @@ -107,9 +102,9 @@ public suspend fun ComputeService.replay( var simulationOffset = Long.MIN_VALUE - for (entry in trace.sortedBy { it.startTime }) { + for (entry in trace.sortedBy { it.submissionTime }) { val now = clock.millis() - val start = entry.startTime.toEpochMilli() + val start = entry.submissionTime.toEpochMilli() // Set the simulationOffset based on the starting time of the first task if (simulationOffset == Long.MIN_VALUE) { @@ -121,25 +116,21 @@ public suspend fun ComputeService.replay( delay(max(0, (start - now - simulationOffset))) } - val checkpointInterval = checkpointModelSpec?.checkpointInterval ?: 0L - val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L - val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 - - val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling) + val workload = entry.trace val meta = mutableMapOf("workload" to workload) launch { val task = client.newTask( entry.name, - image, client.newFlavor( entry.name, entry.cpuCount, entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), + if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), ), - meta = meta, + workload, + meta, ) val taskWatcher = RunningTaskWatcher() @@ -150,7 +141,7 @@ public suspend fun ComputeService.replay( taskWatcher.wait() // Stop the task after reaching the end-time of the virtual machine - task.delete() +// task.delete() } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 2bd9dfa3..df5aabf7 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -26,13 +26,13 @@ import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.carbon.getCarbonTrace -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.createComputeScheduler import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor +import org.opendc.compute.simulator.scheduler.createComputeScheduler +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.experiments.base.scenario.Scenario @@ -89,14 +89,24 @@ public fun runScenario( { createComputeScheduler(scenario.allocationPolicySpec.policyType, Random(it.seeder.nextLong())) }, maxNumFailures = scenario.maxNumFailures, ), - setupHosts(serviceDomain, topology, optimize = true), + setupHosts(serviceDomain, topology), ) - val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile)) + val checkpointInterval = scenario.checkpointModelSpec?.checkpointInterval ?: 0L + val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L + val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 + + val workloadLoader = + ComputeWorkloadLoader( + File(scenario.workloadSpec.pathToFile), + checkpointInterval, + checkpointDuration, + checkpointIntervalScaling, + ) val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) val carbonTrace = getCarbonTrace(scenario.carbonTracePath) - val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli()) + val startTime = Duration.ofMillis(tasks.minOf { it.submissionTime }.toEpochMilli()) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! @@ -104,7 +114,6 @@ public fun runScenario( timeSource, tasks, failureModelSpec = scenario.failureModelSpec, - checkpointModelSpec = scenario.checkpointModelSpec, seed = seed, ) } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt index 160bd783..8ed60b08 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.ExperimentSpec import java.io.File import java.io.InputStream diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt index 91cd09ba..f649e4f8 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.base.scenario -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec import org.opendc.experiments.base.scenario.specs.ExportModelSpec diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt index edfdfaf5..ddc11a50 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.service.scheduler.ComputeSchedulerEnum +import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum /** * specification describing how tasks are allocated diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt index 60fcf51a..b957ea18 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable import org.opendc.common.logger.infoNewLine import org.opendc.common.logger.logger -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import java.util.UUID /** diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt index a27e77bc..c20b4467 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt @@ -61,7 +61,7 @@ import org.opendc.compute.failure.models.SampleBasedFailureModel import org.opendc.compute.failure.models.TraceBasedFailureModel import org.opendc.compute.failure.prefab.FailurePrefab import org.opendc.compute.failure.prefab.createFailureModelPrefab -import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.service.ComputeService import java.io.File import java.time.InstantSource import kotlin.coroutines.CoroutineContext diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt index d7fdb8f4..8f2146f1 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig @Serializable public data class ScenarioSpec( diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt index 08eddca0..41d18225 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt @@ -26,26 +26,27 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.telemetry.ComputeMonitor -import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader import org.opendc.compute.topology.clusterTopology import org.opendc.compute.topology.specs.HostSpec import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import org.opendc.compute.workload.sampleByLoad import org.opendc.compute.workload.trace import org.opendc.experiments.base.runner.replay +import org.opendc.experiments.base.scenario.specs.TraceBasedFailureModelSpec import org.opendc.simulator.kotlin.runSimulation import java.io.File import java.util.Random @@ -80,18 +81,18 @@ class ScenarioIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)), ) - workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 0L, 0L, 0.0) } /** - * Test a large simulation setup. + * Test a small simulation setup. */ @Test - fun testLarge() = + fun testSingleTask() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology("multi.json") + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") val monitor = monitor Provisioner(dispatcher, seed).use { provisioner -> @@ -116,16 +117,11 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101769345, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489430672, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(3.3388920269258898E7, monitor.powerDraw, 1E4) { "Incorrect power draw" } }, - { assertEquals(1.0016127451211525E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } @@ -133,12 +129,13 @@ class ScenarioIntegrationTest { * Test a small simulation setup. */ @Test - fun testSmall() = + fun testSingleTaskSingleFailure() = runSimulation { val seed = 1L - val workload = createTestWorkload(0.25, seed) + val workload = createTestWorkload("single_task", 1.0, seed) val topology = createTopology("single.json") val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/single_failure.parquet") Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( @@ -148,7 +145,7 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) } println( @@ -162,24 +159,25 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(1373419781, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1217668222, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2539987.394500494, monitor.powerDraw, 1E4) { "Incorrect power draw" } }, - { assertEquals(7.617527900379665E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } /** - * Test a small simulation setup with interference. - * TODO: Interference is currently removed from OpenDC. Reactivate when interference is back in. + * Test a small simulation setup. */ - fun testInterference() = + @Test + fun testSingleTask11Failures() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( @@ -189,7 +187,7 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) } println( @@ -203,22 +201,69 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } /** - * Test a small simulation setup with failures. - * FIXME: Currently failures do not work. reactivate this test when Failures are implemented again + * Test a small simulation setup. */ - fun testFailures() = + @Test + fun testSingleTaskCheckpoint() = runSimulation { - val seed = 0L + val seed = 1L + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 1000000L, 1000L, 1.0) + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, + { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSmall() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("bitbrains-small", 0.25, seed) val topology = createTopology("single.json") - val workload = createTestWorkload(0.25, seed) val monitor = monitor Provisioner(dispatcher, seed).use { provisioner -> @@ -229,16 +274,72 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed, failureModelSpec = null) + service.replay(timeSource, workload, seed = seed) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(1803918601, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181585, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } }, + { assertEquals(6.756768E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a large simulation setup. + */ + @Test + fun testLarge() = + runSimulation { + val seed = 0L + val workload = createTestWorkload("bitbrains-small", 1.0, seed) + val topology = createTopology("multi.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, + { assertEquals(43101788258, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412702, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(1.0016592256E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } @@ -246,10 +347,11 @@ class ScenarioIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestWorkload( + traceName: String, fraction: Double, seed: Long, - ): List { - val source = trace("bitbrains-small").sampleByLoad(fraction) + ): List { + val source = trace(traceName).sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed)) } @@ -257,7 +359,7 @@ class ScenarioIntegrationTest { * Obtain the topology factory for the test. */ private fun createTopology(name: String): List { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name")) + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name")) return stream.use { clusterTopology(stream) } } @@ -267,13 +369,17 @@ class ScenarioIntegrationTest { var attemptsError = 0 var tasksPending = 0 var tasksActive = 0 + var tasksTerminated = 0 + var tasksCompleted = 0 override fun record(reader: ServiceTableReader) { attemptsSuccess = reader.attemptsSuccess attemptsFailure = reader.attemptsFailure - attemptsError = reader.attemptsError + attemptsError = 0 tasksPending = reader.tasksPending tasksActive = reader.tasksActive + tasksTerminated = reader.tasksTerminated + tasksCompleted = reader.tasksCompleted } var idleTime = 0L diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json deleted file mode 100644 index c3a060cc..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 32, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 256000 - } - } - ] - }, - { - "name": "C02", - "hosts" : - [ - { - "name": "H02", - "count": 6, - "cpu": - { - "coreCount": 8, - "coreSpeed": 2930 - }, - "memory": { - "memorySize": 64000 - } - } - ] - }, - { - "name": "C03", - "hosts" : - [ - { - "name": "H03", - "count": 2, - "cpu": - { - "coreCount": 16, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 128000 - } - } - ] - } - ] -} - diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json deleted file mode 100644 index f69b21be..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": - { - "coreCount": 8, - "coreSpeed": 3200 - }, - "memory": { - "memorySize": 128000 - } - } - ] - } - ] -} - diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet new file mode 100644 index 00000000..dbd93acb Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet new file mode 100644 index 00000000..d1f8b853 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json new file mode 100644 index 00000000..c3a060cc --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json @@ -0,0 +1,59 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 32, + "coreSpeed": 3200 + }, + "memory": { + "memorySize": 256000 + } + } + ] + }, + { + "name": "C02", + "hosts" : + [ + { + "name": "H02", + "count": 6, + "cpu": + { + "coreCount": 8, + "coreSpeed": 2930 + }, + "memory": { + "memorySize": 64000 + } + } + ] + }, + { + "name": "C03", + "hosts" : + [ + { + "name": "H03", + "count": 2, + "cpu": + { + "coreCount": 16, + "coreSpeed": 3200 + }, + "memory": { + "memorySize": 128000 + } + } + ] + } + ] +} + diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json new file mode 100644 index 00000000..de66bfc2 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json @@ -0,0 +1,23 @@ +{ + "clusters": + [ + { + "name": "C01", + "hosts" : + [ + { + "name": "H01", + "cpu": + { + "coreCount": 12, + "coreSpeed": 3300, + "count": 1 + }, + "memory": { + "memorySize": 140457600000 + } + } + ] + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet deleted file mode 100644 index 240f58e3..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json deleted file mode 100644 index 51fc6366..00000000 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "vms": [ - "141", - "379", - "851", - "116" - ], - "minServerLoad": 0.0, - "performanceScore": 0.8830158730158756 - }, - { - "vms": [ - "205", - "116", - "463" - ], - "minServerLoad": 0.0, - "performanceScore": 0.7133055555552751 - } -] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet deleted file mode 100644 index 8e9dcea7..00000000 Binary files a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet new file mode 100644 index 00000000..240f58e3 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json new file mode 100644 index 00000000..51fc6366 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json @@ -0,0 +1,21 @@ +[ + { + "vms": [ + "141", + "379", + "851", + "116" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "205", + "116", + "463" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet new file mode 100644 index 00000000..8e9dcea7 Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet new file mode 100644 index 00000000..94a2d69e Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet differ diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet new file mode 100644 index 00000000..2a7da2eb Binary files /dev/null and b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet differ -- cgit v1.2.3