diff options
| author | Sacheendra Talluri <sacheendra.t@gmail.com> | 2025-03-27 16:14:39 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-27 16:14:39 +0100 |
| commit | b20dd5ebb48465470b9632dc92ecfb1794a8a4bf (patch) | |
| tree | 0aeab9ad4b366cfe9e8aa79f841563e7f91758ab /opendc-experiments/opendc-experiments-base | |
| parent | ea45406229c8349e44c88f4112fe25435b59e4e9 (diff) | |
Support carbon forecasting in timeshift (#327)
* Remove task from scheduler bookkeeping after failure
* Support carbon forecasting in timeshift
* Register scheduler and carbonmodel in context
* Preliminary working task stopping; carbon intensity bug
* Working carbon based stop. Two timeshift thresholds
* Add a pause state task and guest
* Move task stopper to allocation spec
* Start tracking num pauses
Diffstat (limited to 'opendc-experiments/opendc-experiments-base')
4 files changed, 72 insertions, 9 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt index 7d24c55d..d9df6511 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt @@ -26,6 +26,7 @@ import kotlinx.serialization.Serializable import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum import org.opendc.experiments.base.experiment.specs.allocation.AllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.PrefabAllocationPolicySpec +import org.opendc.experiments.base.experiment.specs.allocation.TaskStopperSpec @Serializable public data class ScenarioSpec( @@ -39,4 +40,5 @@ public data class ScenarioSpec( val failureModel: FailureModelSpec? = null, val checkpointModel: CheckpointModelSpec? = null, val maxNumFailures: Int = 10, + val taskStopper: TaskStopperSpec? = null, ) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt index 686bb84e..21d2f994 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt @@ -27,10 +27,12 @@ import kotlinx.serialization.Serializable import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.TimeshiftScheduler import org.opendc.compute.simulator.scheduler.createPrefabComputeScheduler +import org.opendc.compute.simulator.scheduler.timeshift.TaskStopper +import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler import java.time.InstantSource import java.util.random.RandomGenerator +import kotlin.coroutines.CoroutineContext /** * specification describing how tasks are allocated @@ -61,7 +63,11 @@ public data class TimeShiftAllocationPolicySpec( val weighers: List<HostWeigherSpec>, val windowSize: Int = 168, val subsetSize: Int = 1, - val peakShift: Boolean = true, + val forecast: Boolean = true, + val shortForecastThreshold: Double = 0.2, + val longForecastThreshold: Double = 0.35, + val forecastSize: Int = 24, + val taskStopper: TaskStopperSpec? = null, ) : AllocationPolicySpec public fun createComputeScheduler( @@ -79,7 +85,41 @@ public fun createComputeScheduler( is TimeShiftAllocationPolicySpec -> { val filters = spec.filters.map { createHostFilter(it) } val weighers = spec.weighers.map { createHostWeigher(it) } - TimeshiftScheduler(filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.peakShift, seeder) + TimeshiftScheduler( + filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.forecast, + spec.shortForecastThreshold, spec.longForecastThreshold, spec.forecastSize, seeder, + ) } } } + +@Serializable +@SerialName("taskstopper") +public data class TaskStopperSpec( + val forecast: Boolean = true, + val forecastThreshold: Double = 0.6, + val forecastSize: Int = 24, + val windowSize: Int = 168, +) + +public fun createTaskStopper( + spec: TaskStopperSpec?, + context: CoroutineContext, + clock: InstantSource, +): TaskStopper? { + val taskStopper = + if (spec != null) { + TaskStopper( + clock, + context, + spec.forecast, + spec.forecastThreshold, + spec.forecastSize, + spec.windowSize, + ) + } else { + null + } + + return taskStopper +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 43bab2f5..15445450 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -28,13 +28,17 @@ import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario +import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler +import org.opendc.experiments.base.experiment.specs.allocation.createTaskStopper import org.opendc.experiments.base.experiment.specs.getScalingPolicy import org.opendc.experiments.base.experiment.specs.getWorkloadLoader +import org.opendc.simulator.compute.power.CarbonModel import org.opendc.simulator.compute.power.CarbonReceiver import org.opendc.simulator.kotlin.runSimulation import java.io.File @@ -100,7 +104,6 @@ public fun runScenario( val startTimeLong = workload.minOf { it.submissionTime } val startTime = Duration.ofMillis(startTimeLong) - val carbonReceivers = mutableListOf<CarbonReceiver>() val topology = clusterTopology(scenario.topologySpec.pathToFile) provisioner.runSteps( setupComputeService( @@ -113,15 +116,13 @@ public fun runScenario( timeSource, ) - if (computeScheduler is CarbonReceiver) { - carbonReceivers.add(computeScheduler) - } + provisioner.registry.register(serviceDomain, ComputeScheduler::class.java, computeScheduler) return@setupComputeService computeScheduler }, maxNumFailures = scenario.maxNumFailures, ), - setupHosts(serviceDomain, topology, carbonReceivers, startTimeLong), + setupHosts(serviceDomain, topology, startTimeLong), ) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, scenario.id) @@ -130,6 +131,26 @@ public fun runScenario( service.setTasksExpected(workload.size) service.setMetricReader(provisioner.getMonitor()) + val carbonModel = provisioner.registry.resolve(serviceDomain, CarbonModel::class.java)!! + val computeScheduler = provisioner.registry.resolve(serviceDomain, ComputeScheduler::class.java)!! + if (computeScheduler is CarbonReceiver) { + carbonModel.addReceiver(computeScheduler) + } + carbonModel.addReceiver(service) + + if (scenario.allocationPolicySpec is TimeShiftAllocationPolicySpec) { + val taskStopper = + createTaskStopper( + scenario.allocationPolicySpec.taskStopper, + coroutineContext, + timeSource, + ) + if (taskStopper != null) { + taskStopper.setService(service) + carbonModel.addReceiver(taskStopper) + } + } + service.replay( timeSource, workload, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt index 03a24459..eadb74e7 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -117,7 +117,7 @@ fun runTest( provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1), startTime), - setupHosts(serviceDomain = "compute.opendc.org", topology, listOf(), startTimeLong), + setupHosts(serviceDomain = "compute.opendc.org", topology, startTimeLong), ) val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! |
