diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/main')
8 files changed, 37 insertions, 37 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index 49fa409e..c82e2557 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -29,13 +29,12 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.yield -import org.opendc.compute.api.Task import org.opendc.compute.api.TaskState -import org.opendc.compute.api.TaskWatcher import org.opendc.compute.failure.models.FailureModel -import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.VirtualMachine -import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec +import org.opendc.compute.simulator.TaskWatcher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.compute.workload.Task import org.opendc.experiments.base.scenario.specs.FailureModelSpec import org.opendc.experiments.base.scenario.specs.createFailureModel import java.time.InstantSource @@ -50,7 +49,7 @@ import kotlin.math.max */ public class RunningTaskWatcher : TaskWatcher { // TODO: make this changeable - private val unlockStates: List<TaskState> = listOf(TaskState.DELETED, TaskState.TERMINATED) + private val unlockStates: List<TaskState> = listOf(TaskState.DELETED) private val mutex: Mutex = Mutex() @@ -63,7 +62,7 @@ public class RunningTaskWatcher : TaskWatcher { } override fun onStateChanged( - task: Task, + task: ServiceTask, newState: TaskState, ) { if (unlockStates.contains(newState)) { @@ -73,7 +72,7 @@ public class RunningTaskWatcher : TaskWatcher { } /** - * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * Helper method to replay the specified list of [Task] and suspend execution util all VMs have finished. * * @param clock The simulation clock. * @param trace The trace to simulate. @@ -83,9 +82,8 @@ public class RunningTaskWatcher : TaskWatcher { */ public suspend fun ComputeService.replay( clock: InstantSource, - trace: List<VirtualMachine>, + trace: List<Task>, failureModelSpec: FailureModelSpec? = null, - checkpointModelSpec: CheckpointModelSpec? = null, seed: Long = 0, submitImmediately: Boolean = false, ) { @@ -97,9 +95,6 @@ public suspend fun ComputeService.replay( createFailureModel(coroutineContext, clock, this, Random(seed), it) } - // Create new image for the virtual machine - val image = client.newImage("vm-image") - try { coroutineScope { // Start the fault injector @@ -107,9 +102,9 @@ public suspend fun ComputeService.replay( var simulationOffset = Long.MIN_VALUE - for (entry in trace.sortedBy { it.startTime }) { + for (entry in trace.sortedBy { it.submissionTime }) { val now = clock.millis() - val start = entry.startTime.toEpochMilli() + val start = entry.submissionTime.toEpochMilli() // Set the simulationOffset based on the starting time of the first task if (simulationOffset == Long.MIN_VALUE) { @@ -121,25 +116,21 @@ public suspend fun ComputeService.replay( delay(max(0, (start - now - simulationOffset))) } - val checkpointInterval = checkpointModelSpec?.checkpointInterval ?: 0L - val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L - val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 - - val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling) + val workload = entry.trace val meta = mutableMapOf<String, Any>("workload" to workload) launch { val task = client.newTask( entry.name, - image, client.newFlavor( entry.name, entry.cpuCount, entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), + if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), ), - meta = meta, + workload, + meta, ) val taskWatcher = RunningTaskWatcher() @@ -150,7 +141,7 @@ public suspend fun ComputeService.replay( taskWatcher.wait() // Stop the task after reaching the end-time of the virtual machine - task.delete() +// task.delete() } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 2bd9dfa3..df5aabf7 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -26,13 +26,13 @@ import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.carbon.getCarbonTrace -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.createComputeScheduler import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.telemetry.export.parquet.ParquetComputeMonitor +import org.opendc.compute.simulator.scheduler.createComputeScheduler +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.experiments.base.scenario.Scenario @@ -89,14 +89,24 @@ public fun runScenario( { createComputeScheduler(scenario.allocationPolicySpec.policyType, Random(it.seeder.nextLong())) }, maxNumFailures = scenario.maxNumFailures, ), - setupHosts(serviceDomain, topology, optimize = true), + setupHosts(serviceDomain, topology), ) - val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile)) + val checkpointInterval = scenario.checkpointModelSpec?.checkpointInterval ?: 0L + val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L + val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 + + val workloadLoader = + ComputeWorkloadLoader( + File(scenario.workloadSpec.pathToFile), + checkpointInterval, + checkpointDuration, + checkpointIntervalScaling, + ) val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) val carbonTrace = getCarbonTrace(scenario.carbonTracePath) - val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli()) + val startTime = Duration.ofMillis(tasks.minOf { it.submissionTime }.toEpochMilli()) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! @@ -104,7 +114,6 @@ public fun runScenario( timeSource, tasks, failureModelSpec = scenario.failureModelSpec, - checkpointModelSpec = scenario.checkpointModelSpec, seed = seed, ) } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt index 160bd783..8ed60b08 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ExperimentReader.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.ExperimentSpec import java.io.File import java.io.InputStream diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt index 91cd09ba..f649e4f8 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.base.scenario -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec import org.opendc.experiments.base.scenario.specs.ExportModelSpec diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt index edfdfaf5..ddc11a50 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/AllocationPolicySpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.service.scheduler.ComputeSchedulerEnum +import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum /** * specification describing how tasks are allocated diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt index 60fcf51a..b957ea18 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ExperimentSpec.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable import org.opendc.common.logger.infoNewLine import org.opendc.common.logger.logger -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import java.util.UUID /** diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt index a27e77bc..c20b4467 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/FailureModelSpec.kt @@ -61,7 +61,7 @@ import org.opendc.compute.failure.models.SampleBasedFailureModel import org.opendc.compute.failure.models.TraceBasedFailureModel import org.opendc.compute.failure.prefab.FailurePrefab import org.opendc.compute.failure.prefab.createFailureModelPrefab -import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.service.ComputeService import java.io.File import java.time.InstantSource import kotlin.coroutines.CoroutineContext diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt index d7fdb8f4..8f2146f1 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenarioSpec.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.base.scenario.specs import kotlinx.serialization.Serializable -import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig @Serializable public data class ScenarioSpec( |
