diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-10-25 13:32:41 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-25 13:32:41 +0200 |
| commit | 5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch) | |
| tree | 72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-experiments/opendc-experiments-base/src | |
| parent | 27f5b7dcb05aefdab9b762175d538931face0aba (diff) | |
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests.
Improved checkpointing model
Improved model, started with SimPowerSource
implemented FailureModels and Checkpointing
First working version
midway commit
first update
All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability.
* fixed merge conflicts
* Updated M3SA paths.
* Fixed small typo
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src')
18 files changed, 203 insertions, 97 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index 49fa409e..c82e2557 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -29,13 +29,12 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.yield -import org.opendc.compute.api.Task import org.opendc.compute.api.TaskState -import org.opendc.compute.api.TaskWatcher import org.opendc.compute.failure.models.FailureModel -import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.VirtualMachine -import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec +import org.opendc.compute.simulator.TaskWatcher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.workload.Task import org.opendc.experiments.base.scenario.specs.FailureModelSpec import org.opendc.experiments.base.scenario.specs.createFailureModel import java.time.InstantSource @@ -50,7 +49,7 @@ import kotlin.math.max */ public class RunningTaskWatcher : TaskWatcher { // TODO: make this changeable - private val unlockStates: List<TaskState> = listOf(TaskState.DELETED, TaskState.TERMINATED) + private val unlockStates: List<TaskState> = listOf(TaskState.DELETED) private val mutex: Mutex = Mutex() @@ -63,7 +62,7 @@ public class RunningTaskWatcher : TaskWatcher { } override fun onStateChanged( - task: Task, + task: ServiceTask, newState: TaskState, ) { if (unlockStates.contains(newState)) { @@ -73,7 +72,7 @@ public class RunningTaskWatcher : TaskWatcher { } /** - * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * Helper method to replay the specified list of [Task] and suspend execution util all VMs have finished. * * @param clock The simulation clock. * @param trace The trace to simulate. @@ -83,9 +82,8 @@ public class RunningTaskWatcher : TaskWatcher { */ public suspend fun ComputeService.replay( clock: InstantSource, - trace: List<VirtualMachine>, + trace: List<Task>, failureModelSpec: FailureModelSpec? = null, - checkpointModelSpec: CheckpointModelSpec? = null, seed: Long = 0, submitImmediately: Boolean = false, ) { @@ -97,9 +95,6 @@ public suspend fun ComputeService.replay( createFailureModel(coroutineContext, clock, this, Random(seed), it) } - // Create new image for the virtual machine - val image = client.newImage("vm-image") - try { coroutineScope { // Start the fault injector @@ -107,9 +102,9 @@ public suspend fun ComputeService.replay( var simulationOffset = Long.MIN_VALUE - for (entry in trace.sortedBy { it.startTime }) { + for (entry in trace.sortedBy { it.submissionTime }) { val now = clock.millis() - val start = entry.startTime.toEpochMilli() + val start = entry.submissionTime.toEpochMilli() // Set the simulationOffset based on the starting time of the first task if (simulationOffset == Long.MIN_VALUE) { @@ -121,25 +116,21 @@ public suspend fun ComputeService.replay( delay(max(0, (start - now - simulationOffset))) } - val checkpointInterval = checkpointModelSpec?.checkpointInterval ?: 0L - val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L - val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 - - val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling) + val workload = entry.trace val meta = mutableMapOf<String, Any>("workload" to workload) launch { val task = client.newTask( entry.name, - image, client.newFlavor( entry.name, entry.cpuCount, entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), + if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), ), - meta = meta, + workload, + meta, ) val taskWatcher = RunningTaskWatcher() @@ -150,7 +141,7 @@ public suspend fun ComputeService.replay( taskWatcher.wait() // Stop the task after reaching the end-time of the virtual machine - task.delete() +// task.delete() } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 2bd9dfa3..df5aabf7 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -26,13 +26,13 @@ import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.carbon.getCarbonTrace -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.createComputeScheduler import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor +import org.opendc.compute.simulator.scheduler.createComputeScheduler +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.experiments.base.scenario.Scenario @@ -89,14 +89,24 @@ public fun runScenario( { createComputeScheduler(scenario.allocationPolicySpec.policyType, Random(it.seeder.nextLong())) }, maxNumFailures = scenario.maxNumFailures, ), - setupHosts(serviceDomain, topology, optimize = true), + setupHosts(serviceDomain, topology), ) - val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile)) + val checkpointInterval = scenario.checkpointModelSpec?.checkpointInterval ?: 0L + val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L + val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 + + val workloadLoader = + ComputeWorkloadLoader( + File(scenario.workloadSpec.pathToFile), + checkpointInterval, + checkpointDuration, + checkpointIntervalScaling, + ) val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) val carbonTrace = getCarbonTrace(scenario.carbonTracePath) - val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli()) + val startTime = Duration.ofMillis(tasks.minOf { it.submissionTime }.toEpochMilli()) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! @@ -104,7 +114,6 @@ public fun runScenario( timeSource, tasks, failureModelSpec = scenario.failureModelSpec, - checkpointModelSpec = scenario.checkpointModelSpec, seed = seed, ) } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt index 160bd783..8ed60b08 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.ExperimentSpec import java.io.File import java.io.InputStream diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt index 91cd09ba..f649e4f8 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.base.scenario -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec import org.opendc.experiments.base.scenario.specs.ExportModelSpec diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt index edfdfaf5..ddc11a50 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.service.scheduler.ComputeSchedulerEnum +import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum /** * specification describing how tasks are allocated diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt index 60fcf51a..b957ea18 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable import org.opendc.common.logger.infoNewLine import org.opendc.common.logger.logger -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import java.util.UUID /** diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt index a27e77bc..c20b4467 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt @@ -61,7 +61,7 @@ import org.opendc.compute.failure.models.SampleBasedFailureModel import org.opendc.compute.failure.models.TraceBasedFailureModel import org.opendc.compute.failure.prefab.FailurePrefab import org.opendc.compute.failure.prefab.createFailureModelPrefab -import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.service.ComputeService import java.io.File import java.time.InstantSource import kotlin.coroutines.CoroutineContext diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt index d7fdb8f4..8f2146f1 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig @Serializable public data class ScenarioSpec( diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt index 08eddca0..41d18225 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt @@ -26,26 +26,27 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.telemetry.ComputeMonitor -import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader import org.opendc.compute.topology.clusterTopology import org.opendc.compute.topology.specs.HostSpec import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import org.opendc.compute.workload.sampleByLoad import org.opendc.compute.workload.trace import org.opendc.experiments.base.runner.replay +import org.opendc.experiments.base.scenario.specs.TraceBasedFailureModelSpec import org.opendc.simulator.kotlin.runSimulation import java.io.File import java.util.Random @@ -80,18 +81,18 @@ class ScenarioIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)), ) - workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 0L, 0L, 0.0) } /** - * Test a large simulation setup. + * Test a small simulation setup. */ @Test - fun testLarge() = + fun testSingleTask() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) - val topology = createTopology("multi.json") + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") val monitor = monitor Provisioner(dispatcher, seed).use { provisioner -> @@ -116,16 +117,11 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, - { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101769345, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489430672, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(3.3388920269258898E7, monitor.powerDraw, 1E4) { "Incorrect power draw" } }, - { assertEquals(1.0016127451211525E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + { assertEquals(0, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(3000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1200000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } @@ -133,12 +129,13 @@ class ScenarioIntegrationTest { * Test a small simulation setup. */ @Test - fun testSmall() = + fun testSingleTaskSingleFailure() = runSimulation { val seed = 1L - val workload = createTestWorkload(0.25, seed) + val workload = createTestWorkload("single_task", 1.0, seed) val topology = createTopology("single.json") val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/single_failure.parquet") Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( @@ -148,7 +145,7 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) } println( @@ -162,24 +159,25 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(1373419781, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1217668222, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(2200000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5000000, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2539987.394500494, monitor.powerDraw, 1E4) { "Incorrect power draw" } }, - { assertEquals(7.617527900379665E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + { assertEquals(2440000.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } /** - * Test a small simulation setup with interference. - * TODO: Interference is currently removed from OpenDC. Reactivate when interference is back in. + * Test a small simulation setup. */ - fun testInterference() = + @Test + fun testSingleTask11Failures() = runSimulation { - val seed = 0L - val workload = createTestWorkload(1.0, seed) + val seed = 1L + val workload = createTestWorkload("single_task", 1.0, seed) val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( @@ -189,7 +187,7 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed) + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) } println( @@ -203,22 +201,69 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(42814948316, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(40138266225, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(23489356981, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(1, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(18100000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(20000000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(1.162E7, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } /** - * Test a small simulation setup with failures. - * FIXME: Currently failures do not work. reactivate this test when Failures are implemented again + * Test a small simulation setup. */ - fun testFailures() = + @Test + fun testSingleTaskCheckpoint() = runSimulation { - val seed = 0L + val seed = 1L + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/traces"), 1000000L, 1000L, 1.0) + val workload = createTestWorkload("single_task", 1.0, seed) + val topology = createTopology("single.json") + val monitor = monitor + val failureModelSpec = TraceBasedFailureModelSpec("src/test/resources/failureTraces/11_failures.parquet") + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, failureModelSpec = failureModelSpec, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, + { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, + { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a small simulation setup. + */ + @Test + fun testSmall() = + runSimulation { + val seed = 1L + val workload = createTestWorkload("bitbrains-small", 0.25, seed) val topology = createTopology("single.json") - val workload = createTestWorkload(0.25, seed) val monitor = monitor Provisioner(dispatcher, seed).use { provisioner -> @@ -229,16 +274,72 @@ class ScenarioIntegrationTest { ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload, seed = seed, failureModelSpec = null) + service.replay(timeSource, workload, seed = seed) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(1404277711, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(1478675712, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(152, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(1803918601, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181585, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(360369187, monitor.uptime) { "Uptime incorrect" } }, + { assertEquals(6.756768E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, + ) + } + + /** + * Test a large simulation setup. + */ + @Test + fun testLarge() = + runSimulation { + val seed = 0L + val workload = createTestWorkload("bitbrains-small", 1.0, seed) + val topology = createTopology("multi.json") + val monitor = monitor + + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload, seed = seed) + } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(50, monitor.tasksCompleted, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksTerminated, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, + { assertEquals(43101788258, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412702, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(1.0016592256E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) } @@ -246,10 +347,11 @@ class ScenarioIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestWorkload( + traceName: String, fraction: Double, seed: Long, - ): List<VirtualMachine> { - val source = trace("bitbrains-small").sampleByLoad(fraction) + ): List<Task> { + val source = trace(traceName).sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed)) } @@ -257,7 +359,7 @@ class ScenarioIntegrationTest { * Obtain the topology factory for the test. */ private fun createTopology(name: String): List<HostSpec> { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name")) + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/topologies/$name")) return stream.use { clusterTopology(stream) } } @@ -267,13 +369,17 @@ class ScenarioIntegrationTest { var attemptsError = 0 var tasksPending = 0 var tasksActive = 0 + var tasksTerminated = 0 + var tasksCompleted = 0 override fun record(reader: ServiceTableReader) { attemptsSuccess = reader.attemptsSuccess attemptsFailure = reader.attemptsFailure - attemptsError = reader.attemptsError + attemptsError = 0 tasksPending = reader.tasksPending tasksActive = reader.tasksActive + tasksTerminated = reader.tasksTerminated + tasksCompleted = reader.tasksCompleted } var idleTime = 0L diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet Binary files differnew file mode 100644 index 00000000..dbd93acb --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/11_failures.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet Binary files differnew file mode 100644 index 00000000..d1f8b853 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/failureTraces/single_failure.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json index c3a060cc..c3a060cc 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/env/multi.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/multi.json diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json index f69b21be..de66bfc2 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/env/single.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/topologies/single.json @@ -9,15 +9,15 @@ "name": "H01", "cpu": { - "coreCount": 8, - "coreSpeed": 3200 + "coreCount": 12, + "coreSpeed": 3300, + "count": 1 }, "memory": { - "memorySize": 128000 + "memorySize": 140457600000 } } ] } ] } - diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet Binary files differindex 240f58e3..240f58e3 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json index 51fc6366..51fc6366 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/interference-model.json +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/interference-model.json diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet Binary files differindex 8e9dcea7..8e9dcea7 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/bitbrains-small/tasks.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet Binary files differnew file mode 100644 index 00000000..94a2d69e --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet Binary files differnew file mode 100644 index 00000000..2a7da2eb --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/traces/single_task/tasks.parquet |
