summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-base
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-base')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt46
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt31
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt2
4 files changed, 72 insertions, 9 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt
index 7d24c55d..d9df6511 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt
@@ -26,6 +26,7 @@ import kotlinx.serialization.Serializable
import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum
import org.opendc.experiments.base.experiment.specs.allocation.AllocationPolicySpec
import org.opendc.experiments.base.experiment.specs.allocation.PrefabAllocationPolicySpec
+import org.opendc.experiments.base.experiment.specs.allocation.TaskStopperSpec
@Serializable
public data class ScenarioSpec(
@@ -39,4 +40,5 @@ public data class ScenarioSpec(
val failureModel: FailureModelSpec? = null,
val checkpointModel: CheckpointModelSpec? = null,
val maxNumFailures: Int = 10,
+ val taskStopper: TaskStopperSpec? = null,
)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt
index 686bb84e..21d2f994 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/allocation/AllocationPolicySpec.kt
@@ -27,10 +27,12 @@ import kotlinx.serialization.Serializable
import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.scheduler.ComputeSchedulerEnum
import org.opendc.compute.simulator.scheduler.FilterScheduler
-import org.opendc.compute.simulator.scheduler.TimeshiftScheduler
import org.opendc.compute.simulator.scheduler.createPrefabComputeScheduler
+import org.opendc.compute.simulator.scheduler.timeshift.TaskStopper
+import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler
import java.time.InstantSource
import java.util.random.RandomGenerator
+import kotlin.coroutines.CoroutineContext
/**
* specification describing how tasks are allocated
@@ -61,7 +63,11 @@ public data class TimeShiftAllocationPolicySpec(
val weighers: List<HostWeigherSpec>,
val windowSize: Int = 168,
val subsetSize: Int = 1,
- val peakShift: Boolean = true,
+ val forecast: Boolean = true,
+ val shortForecastThreshold: Double = 0.2,
+ val longForecastThreshold: Double = 0.35,
+ val forecastSize: Int = 24,
+ val taskStopper: TaskStopperSpec? = null,
) : AllocationPolicySpec
public fun createComputeScheduler(
@@ -79,7 +85,41 @@ public fun createComputeScheduler(
is TimeShiftAllocationPolicySpec -> {
val filters = spec.filters.map { createHostFilter(it) }
val weighers = spec.weighers.map { createHostWeigher(it) }
- TimeshiftScheduler(filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.peakShift, seeder)
+ TimeshiftScheduler(
+ filters, weighers, spec.windowSize, clock, spec.subsetSize, spec.forecast,
+ spec.shortForecastThreshold, spec.longForecastThreshold, spec.forecastSize, seeder,
+ )
}
}
}
+
+@Serializable
+@SerialName("taskstopper")
+public data class TaskStopperSpec(
+ val forecast: Boolean = true,
+ val forecastThreshold: Double = 0.6,
+ val forecastSize: Int = 24,
+ val windowSize: Int = 168,
+)
+
+public fun createTaskStopper(
+ spec: TaskStopperSpec?,
+ context: CoroutineContext,
+ clock: InstantSource,
+): TaskStopper? {
+ val taskStopper =
+ if (spec != null) {
+ TaskStopper(
+ clock,
+ context,
+ spec.forecast,
+ spec.forecastThreshold,
+ spec.forecastSize,
+ spec.windowSize,
+ )
+ } else {
+ null
+ }
+
+ return taskStopper
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index 43bab2f5..15445450 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -28,13 +28,17 @@ import org.opendc.compute.simulator.provisioner.Provisioner
import org.opendc.compute.simulator.provisioner.registerComputeMonitor
import org.opendc.compute.simulator.provisioner.setupComputeService
import org.opendc.compute.simulator.provisioner.setupHosts
+import org.opendc.compute.simulator.scheduler.ComputeScheduler
import org.opendc.compute.simulator.service.ComputeService
import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor
import org.opendc.compute.topology.clusterTopology
import org.opendc.experiments.base.experiment.Scenario
+import org.opendc.experiments.base.experiment.specs.allocation.TimeShiftAllocationPolicySpec
import org.opendc.experiments.base.experiment.specs.allocation.createComputeScheduler
+import org.opendc.experiments.base.experiment.specs.allocation.createTaskStopper
import org.opendc.experiments.base.experiment.specs.getScalingPolicy
import org.opendc.experiments.base.experiment.specs.getWorkloadLoader
+import org.opendc.simulator.compute.power.CarbonModel
import org.opendc.simulator.compute.power.CarbonReceiver
import org.opendc.simulator.kotlin.runSimulation
import java.io.File
@@ -100,7 +104,6 @@ public fun runScenario(
val startTimeLong = workload.minOf { it.submissionTime }
val startTime = Duration.ofMillis(startTimeLong)
- val carbonReceivers = mutableListOf<CarbonReceiver>()
val topology = clusterTopology(scenario.topologySpec.pathToFile)
provisioner.runSteps(
setupComputeService(
@@ -113,15 +116,13 @@ public fun runScenario(
timeSource,
)
- if (computeScheduler is CarbonReceiver) {
- carbonReceivers.add(computeScheduler)
- }
+ provisioner.registry.register(serviceDomain, ComputeScheduler::class.java, computeScheduler)
return@setupComputeService computeScheduler
},
maxNumFailures = scenario.maxNumFailures,
),
- setupHosts(serviceDomain, topology, carbonReceivers, startTimeLong),
+ setupHosts(serviceDomain, topology, startTimeLong),
)
addExportModel(provisioner, serviceDomain, scenario, seed, startTime, scenario.id)
@@ -130,6 +131,26 @@ public fun runScenario(
service.setTasksExpected(workload.size)
service.setMetricReader(provisioner.getMonitor())
+ val carbonModel = provisioner.registry.resolve(serviceDomain, CarbonModel::class.java)!!
+ val computeScheduler = provisioner.registry.resolve(serviceDomain, ComputeScheduler::class.java)!!
+ if (computeScheduler is CarbonReceiver) {
+ carbonModel.addReceiver(computeScheduler)
+ }
+ carbonModel.addReceiver(service)
+
+ if (scenario.allocationPolicySpec is TimeShiftAllocationPolicySpec) {
+ val taskStopper =
+ createTaskStopper(
+ scenario.allocationPolicySpec.taskStopper,
+ coroutineContext,
+ timeSource,
+ )
+ if (taskStopper != null) {
+ taskStopper.setService(service)
+ carbonModel.addReceiver(taskStopper)
+ }
+ }
+
service.replay(
timeSource,
workload,
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
index 03a24459..eadb74e7 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt
@@ -117,7 +117,7 @@ fun runTest(
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1), startTime),
- setupHosts(serviceDomain = "compute.opendc.org", topology, listOf(), startTimeLong),
+ setupHosts(serviceDomain = "compute.opendc.org", topology, startTimeLong),
)
val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!!