diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-base/src/main/kotlin')
3 files changed, 71 insertions, 8 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt index 7d24c55d..d9df6511 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt @@ -26,6 +26,7 @@ import kotlinx.serialization.Serializable import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum import org.opendc.experiments.base.experiment.specs.allocation.AllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.PrefabAllocationPolicySpec +import org.opendc.experiments.base.experiment.specs.allocation.TaskStopperSpec @Serializable public data class ScenarioSpec( @@ -39,4 +40,5 @@ public data class ScenarioSpec( val failureModel: FailureModelSpec? = null, val checkpointModel: CheckpointModelSpec? = null, val maxNumFailures: Int = 10, + val taskStopper: TaskStopperSpec? = null, ) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt index 686bb84e..21d2f994 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt @@ -27,10 +27,12 @@ import kotlinx.serialization.Serializable import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.TimeshiftScheduler import org.opendc.compute.simulator.scheduler.createPrefabComputeScheduler +import org.opendc.compute.simulator.scheduler.timeshift.TaskStopper +import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler import java.time.InstantSource import java.util.random.RandomGenerator +import kotlin.coroutines.CoroutineContext /** * specification describing how tasks are allocated @@ -61,7 +63,11 @@ public data class TimeShiftAllocationPolicySpec( val weighers: List<HostWeigherSpec>, val windowSize: Int = 168, val subsetSize: Int = 1, - val peakShift: Boolean = true, + val forecast: Boolean = true, + val shortForecastThreshold: Double = 0.2, + val longForecastThreshold: Double = 0.35, + val forecastSize: Int = 24, + val taskStopper: TaskStopperSpec? = null, ) : AllocationPolicySpec public fun createComputeScheduler( @@ -79,7 +85,41 @@ public fun createComputeScheduler( is TimeShiftAllocationPolicySpec -> { val filters = spec.filters.map { createHostFilter(it) } val weighers = spec.weighers.map { createHostWeigher(it) } - TimeshiftScheduler(filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.peakShift, seeder) + TimeshiftScheduler( + filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.forecast, + spec.shortForecastThreshold, spec.longForecastThreshold, spec.forecastSize, seeder, + ) } } } + +@Serializable +@SerialName("taskstopper") +public data class TaskStopperSpec( + val forecast: Boolean = true, + val forecastThreshold: Double = 0.6, + val forecastSize: Int = 24, + val windowSize: Int = 168, +) + +public fun createTaskStopper( + spec: TaskStopperSpec?, + context: CoroutineContext, + clock: InstantSource, +): TaskStopper? { + val taskStopper = + if (spec != null) { + TaskStopper( + clock, + context, + spec.forecast, + spec.forecastThreshold, + spec.forecastSize, + spec.windowSize, + ) + } else { + null + } + + return taskStopper +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 43bab2f5..15445450 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -28,13 +28,17 @@ import org.opendc.compute.simulator.provisioner.Provisioner import org.opendc.compute.simulator.provisioner.registerComputeMonitor import org.opendc.compute.simulator.provisioner.setupComputeService import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario +import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler +import org.opendc.experiments.base.experiment.specs.allocation.createTaskStopper import org.opendc.experiments.base.experiment.specs.getScalingPolicy import org.opendc.experiments.base.experiment.specs.getWorkloadLoader +import org.opendc.simulator.compute.power.CarbonModel import org.opendc.simulator.compute.power.CarbonReceiver import org.opendc.simulator.kotlin.runSimulation import java.io.File @@ -100,7 +104,6 @@ public fun runScenario( val startTimeLong = workload.minOf { it.submissionTime } val startTime = Duration.ofMillis(startTimeLong) - val carbonReceivers = mutableListOf<CarbonReceiver>() val topology = clusterTopology(scenario.topologySpec.pathToFile) provisioner.runSteps( setupComputeService( @@ -113,15 +116,13 @@ public fun runScenario( timeSource, ) - if (computeScheduler is CarbonReceiver) { - carbonReceivers.add(computeScheduler) - } + provisioner.registry.register(serviceDomain, ComputeScheduler::class.java, computeScheduler) return@setupComputeService computeScheduler }, maxNumFailures = scenario.maxNumFailures, ), - setupHosts(serviceDomain, topology, carbonReceivers, startTimeLong), + setupHosts(serviceDomain, topology, startTimeLong), ) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, scenario.id) @@ -130,6 +131,26 @@ public fun runScenario( service.setTasksExpected(workload.size) service.setMetricReader(provisioner.getMonitor()) + val carbonModel = provisioner.registry.resolve(serviceDomain, CarbonModel::class.java)!! + val computeScheduler = provisioner.registry.resolve(serviceDomain, ComputeScheduler::class.java)!! + if (computeScheduler is CarbonReceiver) { + carbonModel.addReceiver(computeScheduler) + } + carbonModel.addReceiver(service) + + if (scenario.allocationPolicySpec is TimeShiftAllocationPolicySpec) { + val taskStopper = + createTaskStopper( + scenario.allocationPolicySpec.taskStopper, + coroutineContext, + timeSource, + ) + if (taskStopper != null) { + taskStopper.setService(service) + carbonModel.addReceiver(taskStopper) + } + } + service.replay( timeSource, workload, |
