summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-simulator
diff options
context:
space:
mode:
authorSacheendra Talluri <sacheendra.t@gmail.com>2025-03-27 16:14:39 +0100
committerGitHub <noreply@github.com>2025-03-27 16:14:39 +0100
commitb20dd5ebb48465470b9632dc92ecfb1794a8a4bf (patch)
tree0aeab9ad4b366cfe9e8aa79f841563e7f91758ab /opendc-compute/opendc-compute-simulator
parentea45406229c8349e44c88f4112fe25435b59e4e9 (diff)
Support carbon forecasting in timeshift (#327)
* Remove task from scheduler bookkeeping after failure * Support carbon forecasting in timeshift * Register scheduler and carbonmodel in context * Preliminary working task stopping; carbon intensity bug * Working carbon based stop. Two timeshift thresholds * Add a pause state task and guest * Move task stopper to allocation spec * Start tracking num pauses
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java13
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java20
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt47
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt62
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt8
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt113
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt (renamed from opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt)73
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt5
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt3
13 files changed, 271 insertions, 87 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
index b4b34881..fb9dc6e6 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator.service;
import java.time.Duration;
import java.time.InstantSource;
-import java.time.temporal.TemporalAmount;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@@ -186,7 +185,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
task.setState(newState);
- if (newState == TaskState.COMPLETED || newState == TaskState.TERMINATED || newState == TaskState.FAILED) {
+ if (newState == TaskState.COMPLETED
+ || newState == TaskState.PAUSED
+ || newState == TaskState.TERMINATED
+ || newState == TaskState.FAILED) {
LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor());
if (activeTasks.remove(task) != null) {
@@ -204,7 +206,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
}
task.setHost(null);
- host.removeTask(task);
+ host.delete(task);
if (newState == TaskState.COMPLETED) {
tasksCompleted++;
@@ -214,10 +216,11 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
}
if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
- scheduler.removeTask(task, hv);
setTaskToBeRemoved(task);
}
+ scheduler.removeTask(task, hv);
+
// Try to reschedule if needed
requestSchedulingCycle();
}
@@ -672,7 +675,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
public ServiceTask newTask(
@NotNull String name,
@NotNull TaskNature nature,
- @NotNull TemporalAmount duration,
+ @NotNull Duration duration,
@NotNull Long deadline,
@NotNull ServiceFlavor flavor,
@NotNull Workload workload,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java
index cada796a..6d097efb 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java
+++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java
@@ -22,8 +22,8 @@
package org.opendc.compute.simulator.service;
+import java.time.Duration;
import java.time.Instant;
-import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -51,7 +51,7 @@ public class ServiceTask {
private final String name;
private final TaskNature nature;
- private final TemporalAmount duration;
+ private final Duration duration;
private final Long deadline;
private ServiceFlavor flavor;
public Workload workload;
@@ -67,13 +67,14 @@ public class ServiceTask {
private SchedulingRequest request = null;
private int numFailures = 0;
+ private int numPauses = 0;
ServiceTask(
ComputeService service,
UUID uid,
String name,
TaskNature nature,
- TemporalAmount duration,
+ Duration duration,
Long deadline,
ServiceFlavor flavor,
Workload workload,
@@ -102,7 +103,7 @@ public class ServiceTask {
}
@NotNull
- public TemporalAmount getDuration() {
+ public Duration getDuration() {
return duration;
}
@@ -165,6 +166,10 @@ public class ServiceTask {
return this.numFailures;
}
+ public int getNumPauses() {
+ return this.numPauses;
+ }
+
public void start() {
switch (state) {
case PROVISIONING:
@@ -182,6 +187,11 @@ public class ServiceTask {
assert request == null : "Scheduling request already active";
request = service.schedule(this);
break;
+ case PAUSED:
+ LOGGER.info("User requested to start task after pause {}", uid);
+ setState(TaskState.PROVISIONING);
+ request = service.schedule(this, true);
+ break;
case FAILED:
LOGGER.info("User requested to start task after failure {}", uid);
setState(TaskState.PROVISIONING);
@@ -237,6 +247,8 @@ public class ServiceTask {
}
if (newState == TaskState.FAILED) {
this.numFailures++;
+ } else if (newState == TaskState.PAUSED) {
+ this.numPauses++;
}
if ((newState == TaskState.COMPLETED) || newState == TaskState.FAILED) {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
index 9c58f7ab..d23794ab 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt
@@ -71,7 +71,7 @@ public class SimHost(
* The virtual machines running on the hypervisor.
*/
private val taskToGuestMap = HashMap<ServiceTask, Guest>()
- private val guests = mutableListOf<Guest>()
+ private val guests = mutableSetOf<Guest>()
private var hostState: HostState = HostState.DOWN
set(value) {
@@ -151,12 +151,20 @@ public class SimHost(
// Fail the guest and delete them
// This weird loop is the only way I have been able to make it work.
while (guests.size > 0) {
- val guest = guests[0]
+ val guest = guests.first()
guest.fail()
this.delete(guest.task)
}
}
+ public fun pauseAllTasks() {
+ while (guests.size > 0) {
+ val guest = guests.first()
+ guest.pause()
+ this.delete(guest.task)
+ }
+ }
+
public fun recover() {
updateUptime()
@@ -194,7 +202,7 @@ public class SimHost(
}
public fun getGuests(): List<Guest> {
- return this.guests
+ return this.guests.toList()
}
public fun canFit(task: ServiceTask): Boolean {
@@ -239,28 +247,19 @@ public class SimHost(
guest.start()
}
- public fun stop(task: ServiceTask) {
- val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" }
- guest.stop()
- }
+// public fun stop(task: ServiceTask) {
+// val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" }
+// guest.stop()
+// }
public fun delete(task: ServiceTask) {
val guest = taskToGuestMap[task] ?: return
- guest.delete()
taskToGuestMap.remove(task)
guests.remove(guest)
task.host = null
}
- public fun removeTask(task: ServiceTask) {
- val guest = taskToGuestMap[task] ?: return
- guest.delete()
-
- taskToGuestMap.remove(task)
- guests.remove(guest)
- }
-
public fun addListener(listener: HostListener) {
hostListeners.add(listener)
}
@@ -281,16 +280,21 @@ public class SimHost(
var invalid = 0
var completed = 0
- val guests = guests.listIterator()
for (guest in guests) {
when (guest.state) {
TaskState.RUNNING -> running++
- TaskState.COMPLETED, TaskState.FAILED, TaskState.TERMINATED -> {
+ TaskState.FAILED, TaskState.TERMINATED -> {
failed++
// Remove guests that have been deleted
this.taskToGuestMap.remove(guest.task)
- guests.remove()
+ guests.remove(guest)
+ }
+ TaskState.COMPLETED -> {
+ completed++
+ this.taskToGuestMap.remove(guest.task)
+ guests.remove(guest)
}
+ TaskState.PAUSED -> {}
else -> invalid++
}
}
@@ -366,9 +370,8 @@ public class SimHost(
totalDowntime += duration
}
- val guests = guests
- for (i in guests.indices) {
- guests[i].updateUptime()
+ for (guest in guests) {
+ guest.updateUptime()
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 8096a67a..fe8cbf2f 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -70,7 +70,7 @@ public class Guest(
*/
public fun start() {
when (state) {
- TaskState.CREATED, TaskState.FAILED -> {
+ TaskState.CREATED, TaskState.FAILED, TaskState.PAUSED -> {
LOGGER.info { "User requested to start task ${task.uid}" }
doStart()
}
@@ -139,17 +139,17 @@ public class Guest(
listener.onStart(this)
}
- /**
- * Stop the guest.
- */
- public fun stop() {
- when (state) {
- TaskState.RUNNING -> doStop(TaskState.COMPLETED)
- TaskState.FAILED -> state = TaskState.TERMINATED
- TaskState.COMPLETED, TaskState.TERMINATED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+// /**
+// * Stop the guest.
+// */
+// public fun stop() {
+// when (state) {
+// TaskState.RUNNING -> doStop(TaskState.COMPLETED)
+// TaskState.FAILED -> state = TaskState.TERMINATED
+// TaskState.COMPLETED, TaskState.PAUSED, TaskState.TERMINATED -> return
+// else -> assert(false) { "Invalid state transition" }
+// }
+// }
/**
* Attempt to stop the task and put it into [target] state.
@@ -157,6 +157,7 @@ public class Guest(
private fun doStop(target: TaskState) {
assert(virtualMachine != null) { "Invalid job state" }
val virtualMachine = this.virtualMachine ?: return
+ this.state = target
if (target == TaskState.FAILED) {
virtualMachine.stopWorkload(Exception("Task has failed"))
} else {
@@ -164,8 +165,6 @@ public class Guest(
}
this.virtualMachine = null
-
- this.state = target
}
/**
@@ -174,21 +173,24 @@ public class Guest(
private fun onStop(target: TaskState) {
updateUptime()
- state = target
+ if (state == TaskState.RUNNING) {
+ // If state is not running, that means state has been changed already and the callback should be ignored
+ state = target
+ }
listener.onStop(this)
}
- /**
- * Delete the guest.
- *
- * This operation will stop the guest if it is running on the host and remove all resources associated with the
- * guest.
- */
- public fun delete() {
- stop()
-
- state = TaskState.FAILED
- }
+// /**
+// * Delete the guest.
+// *
+// * This operation will stop the guest if it is running on the host and remove all resources associated with the
+// * guest.
+// */
+// public fun delete() {
+// stop()
+//
+// state = TaskState.FAILED
+// }
/**
* Fail the guest if it is active.
@@ -203,6 +205,14 @@ public class Guest(
doStop(TaskState.FAILED)
}
+ public fun pause() {
+ if (state != TaskState.RUNNING) {
+ return
+ }
+
+ doStop(TaskState.PAUSED)
+ }
+
/**
* Recover the guest if it is in an error state.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
index 1145c15b..a18856f8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt
@@ -29,7 +29,6 @@ import org.opendc.compute.simulator.telemetry.ComputeMonitor
import org.opendc.compute.simulator.telemetry.OutputFiles
import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostSpec
-import org.opendc.simulator.compute.power.CarbonReceiver
import java.time.Duration
/**
@@ -85,8 +84,7 @@ public fun registerComputeMonitor(
public fun setupHosts(
serviceDomain: String,
specs: List<ClusterSpec>,
- carbonReceivers: List<CarbonReceiver>,
startTime: Long = 0L,
): ProvisioningStep {
- return HostsProvisioningStep(serviceDomain, specs, carbonReceivers, startTime)
+ return HostsProvisioningStep(serviceDomain, specs, startTime)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
index 75508b8d..675ce3a9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt
@@ -29,7 +29,6 @@ import org.opendc.compute.topology.specs.ClusterSpec
import org.opendc.compute.topology.specs.HostSpec
import org.opendc.compute.topology.specs.createSimBatteryPolicy
import org.opendc.simulator.compute.power.CarbonModel
-import org.opendc.simulator.compute.power.CarbonReceiver
import org.opendc.simulator.compute.power.SimPowerSource
import org.opendc.simulator.compute.power.batteries.BatteryAggregator
import org.opendc.simulator.compute.power.batteries.SimBattery
@@ -47,7 +46,6 @@ import org.opendc.simulator.engine.graph.FlowEdge
public class HostsProvisioningStep internal constructor(
private val serviceDomain: String,
private val clusterSpecs: List<ClusterSpec>,
- private val carbonReceivers: List<CarbonReceiver>,
private val startTime: Long = 0L,
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
@@ -77,11 +75,7 @@ public class HostsProvisioningStep internal constructor(
if (carbonFragments != null) {
carbonModel = CarbonModel(engine, carbonFragments, startTime)
carbonModel.addReceiver(simPowerSource)
- for (receiver in carbonReceivers) {
- carbonModel.addReceiver(receiver)
- }
- val computeService = ctx.registry.resolve(serviceDomain, ComputeService::class.java)!!
- carbonModel.addReceiver(computeService)
+ ctx.registry.register(serviceDomain, CarbonModel::class.java, carbonModel)
}
if (cluster.battery != null) {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt
index ecf4804a..c11a6db8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt
@@ -27,6 +27,7 @@ package org.opendc.compute.simulator.scheduler
import org.opendc.compute.simulator.scheduler.filters.ComputeFilter
import org.opendc.compute.simulator.scheduler.filters.RamFilter
import org.opendc.compute.simulator.scheduler.filters.VCpuFilter
+import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler
import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher
import org.opendc.compute.simulator.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.simulator.scheduler.weights.RamWeigher
@@ -135,7 +136,6 @@ public fun createPrefabComputeScheduler(
weighers = listOf(RamWeigher(multiplier = 1.0)),
windowSize = 168,
clock = clock,
- peakShift = false,
random = SplittableRandom(seeder.nextLong()),
)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt
new file mode 100644
index 00000000..4ea526b7
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.scheduler.timeshift
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.launch
+import org.opendc.compute.simulator.service.ComputeService
+import org.opendc.simulator.compute.power.CarbonModel
+import org.opendc.simulator.compute.power.CarbonReceiver
+import java.time.InstantSource
+import java.util.LinkedList
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToInt
+
+public class TaskStopper(
+ private val clock: InstantSource,
+ context: CoroutineContext,
+ private val forecast: Boolean = true,
+ private val forecastThreshold: Double = 0.6,
+ private val forecastSize: Int = 24,
+ private val windowSize: Int = 168,
+) : CarbonReceiver {
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ private val pastCarbonIntensities = LinkedList<Double>()
+ private var carbonRunningSum = 0.0
+ private var isHighCarbon = false
+ private var carbonModel: CarbonModel? = null
+
+ private var service: ComputeService? = null
+ private var client: ComputeService.ComputeClient? = null
+
+ public fun setService(service: ComputeService) {
+ this.service = service
+ this.client = service.newClient()
+ }
+
+ private fun pauseTasks() {
+ for (host in service!!.hosts) {
+ val guests = host.getGuests()
+
+ val snapshots =
+ guests.map {
+ it.virtualMachine!!.makeSnapshot(clock.millis())
+ it.virtualMachine!!.snapshot
+ }
+ val tasks = guests.map { it.task }
+ host.pauseAllTasks()
+
+ for ((task, snapshot) in tasks.zip(snapshots)) {
+ client!!.rescheduleTask(task, snapshot)
+ }
+ }
+ }
+
+ override fun updateCarbonIntensity(newCarbonIntensity: Double) {
+ if (!forecast) {
+ isHighCarbon = noForecastUpdateCarbonIntensity(newCarbonIntensity)
+ } else {
+ val forecast = carbonModel!!.getForecast(forecastSize)
+ val quantileIndex = (forecastSize * forecastThreshold).roundToInt()
+ val thresholdCarbonIntensity = forecast.sorted()[quantileIndex]
+
+ isHighCarbon = newCarbonIntensity > thresholdCarbonIntensity
+ }
+
+ if (isHighCarbon) {
+ scope.launch {
+ pauseTasks()
+ }
+ }
+ }
+
+ private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double): Boolean {
+ this.pastCarbonIntensities.addLast(newCarbonIntensity)
+ this.carbonRunningSum += newCarbonIntensity
+ if (this.pastCarbonIntensities.size > this.windowSize) {
+ this.carbonRunningSum -= this.pastCarbonIntensities.removeFirst()
+ }
+
+ val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size
+
+ isHighCarbon = (newCarbonIntensity > thresholdCarbonIntensity)
+ return isHighCarbon
+ }
+
+ override fun setCarbonModel(carbonModel: CarbonModel?) {
+ this.carbonModel = carbonModel
+ }
+
+ override fun removeCarbonModel(carbonModel: CarbonModel?) {}
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt
index 50081cd6..2f9b4364 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt
@@ -20,8 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.scheduler
+package org.opendc.compute.simulator.scheduler.timeshift
+import org.opendc.compute.simulator.scheduler.ComputeScheduler
+import org.opendc.compute.simulator.scheduler.SchedulingRequest
+import org.opendc.compute.simulator.scheduler.SchedulingResult
+import org.opendc.compute.simulator.scheduler.SchedulingResultType
import org.opendc.compute.simulator.scheduler.filters.HostFilter
import org.opendc.compute.simulator.scheduler.weights.HostWeigher
import org.opendc.compute.simulator.service.HostView
@@ -34,6 +38,7 @@ import java.util.LinkedList
import java.util.SplittableRandom
import java.util.random.RandomGenerator
import kotlin.math.min
+import kotlin.math.roundToInt
public class TimeshiftScheduler(
private val filters: List<HostFilter>,
@@ -41,7 +46,10 @@ public class TimeshiftScheduler(
private val windowSize: Int,
private val clock: InstantSource,
private val subsetSize: Int = 1,
- private val peakShift: Boolean = true,
+ private val forecast: Boolean = true,
+ private val shortForecastThreshold: Double = 0.2,
+ private val longForecastThreshold: Double = 0.35,
+ private val forecastSize: Int = 24,
private val random: RandomGenerator = SplittableRandom(0),
) : ComputeScheduler, CarbonReceiver {
/**
@@ -55,7 +63,9 @@ public class TimeshiftScheduler(
private val pastCarbonIntensities = LinkedList<Double>()
private var carbonRunningSum = 0.0
- private var isLowCarbon = false
+ private var shortLowCarbon = false // Low carbon regime for short tasks (< 2 hours)
+ private var longLowCarbon = false // Low carbon regime for long tasks (>= hours)
+ private var carbonModel: CarbonModel? = null
override fun addHost(host: HostView) {
hosts.add(host)
@@ -75,8 +85,16 @@ public class TimeshiftScheduler(
val task = req.task
- if (!isLowCarbon) {
- if (task.nature.deferrable) {
+ /**
+ If we are not in a low carbon regime, delay tasks.
+ Only delay tasks if they are deferrable and it doesn't violate the deadline.
+ Separate delay thresholds for short and long tasks.
+ */
+ if (task.nature.deferrable) {
+ val durInHours = task.duration.toHours()
+ if ((durInHours < 2 && !shortLowCarbon) ||
+ (durInHours >= 2 && !longLowCarbon)
+ ) {
val currentTime = clock.instant()
val estimatedCompletion = currentTime.plus(task.duration)
val deadline = Instant.ofEpochMilli(task.deadline)
@@ -142,7 +160,32 @@ public class TimeshiftScheduler(
host: HostView?,
) {}
+ /**
+ Compare current carbon intensity to the chosen quantile from the [forecastSize]
+ number of intensity forecasts
+ */
override fun updateCarbonIntensity(newCarbonIntensity: Double) {
+ if (!forecast) {
+ noForecastUpdateCarbonIntensity(newCarbonIntensity)
+ return
+ }
+
+ val forecast = carbonModel!!.getForecast(forecastSize)
+ val forecastSize = forecast.size
+ val shortQuantileIndex = (forecastSize * shortForecastThreshold).roundToInt()
+ val shortCarbonIntensity = forecast.sorted()[shortQuantileIndex]
+ val longQuantileIndex = (forecastSize * longForecastThreshold).roundToInt()
+ val longCarbonIntensity = forecast.sorted()[longQuantileIndex]
+
+ shortLowCarbon = newCarbonIntensity < shortCarbonIntensity
+ longLowCarbon = newCarbonIntensity < longCarbonIntensity
+ }
+
+ /**
+ Compare current carbon intensity to the moving average of the past [windowSize]
+ number of intensity updates
+ */
+ private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double) {
val previousCarbonIntensity =
if (this.pastCarbonIntensities.isEmpty()) {
0.0
@@ -157,22 +200,14 @@ public class TimeshiftScheduler(
val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size
- if (!peakShift) {
- isLowCarbon = newCarbonIntensity < thresholdCarbonIntensity
- return
- }
-
- isLowCarbon = (
- (newCarbonIntensity < thresholdCarbonIntensity) &&
- (newCarbonIntensity > previousCarbonIntensity)
- ) ||
- (
- (newCarbonIntensity < 1.2 * thresholdCarbonIntensity) &&
- isLowCarbon
- )
+ shortLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) &&
+ (newCarbonIntensity > previousCarbonIntensity)
+ longLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity)
}
- override fun setCarbonModel(carbonModel: CarbonModel?) {}
+ override fun setCarbonModel(carbonModel: CarbonModel?) {
+ this.carbonModel = carbonModel
+ }
override fun removeCarbonModel(carbonModel: CarbonModel?) {}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
index 2f9633db..6f6b5bdd 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt
@@ -134,6 +134,11 @@ public object DfltTaskExportColumns {
field = Types.required(INT64).named("num_failures"),
) { it.numFailures }
+ public val NUM_PAUSES: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("num_pauses"),
+ ) { it.numPauses }
+
public val SCHEDULE_TIME: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.optional(INT64).named("schedule_time"),
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
index 79c8e4a6..771ced37 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt
@@ -75,6 +75,8 @@ public interface TaskTableReader : Exportable {
*/
public val numFailures: Int
+ public val numPauses: Int
+
/**
* The [Instant] at which the task was scheduled relative to the start of the workload.
*/
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
index be8d5725..881b9916 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt
@@ -65,6 +65,7 @@ public class TaskTableReaderImpl(
_uptime = table.uptime
_downtime = table.downtime
_numFailures = table.numFailures
+ _numPauses = table.numPauses
_scheduleTime = table.scheduleTime
_submissionTime = table.submissionTime
@@ -114,6 +115,10 @@ public class TaskTableReaderImpl(
get() = _numFailures
private var _numFailures = 0
+ override val numPauses: Int
+ get() = _numPauses
+ private var _numPauses = 0
+
override val submissionTime: Instant?
get() = _submissionTime
private var _submissionTime: Instant? = null
@@ -197,6 +202,7 @@ public class TaskTableReaderImpl(
_downtime = sysStats?.downtime?.toMillis() ?: _downtime
_numFailures = task.numFailures
+ _numPauses = task.numPauses
_submissionTime = task.submittedAt
_scheduleTime = task.scheduledAt
_finishTime = task.finishedAt
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt
index b893c1aa..46c6425e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt
@@ -26,6 +26,7 @@ import io.mockk.every
import io.mockk.mockk
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
+import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler
import org.opendc.compute.simulator.service.TaskNature
import java.time.Duration
import java.time.Instant
@@ -43,6 +44,7 @@ class TimeshiftSchedulerTest {
weighers = emptyList(),
windowSize = 2,
clock = clock,
+ forecast = false,
)
val req = mockk<SchedulingRequest>()
@@ -70,6 +72,7 @@ class TimeshiftSchedulerTest {
weighers = emptyList(),
windowSize = 2,
clock = clock,
+ forecast = false,
)
val req = mockk<SchedulingRequest>()