diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
13 files changed, 271 insertions, 87 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index b4b34881..fb9dc6e6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -24,7 +24,6 @@ package org.opendc.compute.simulator.service; import java.time.Duration; import java.time.InstantSource; -import java.time.temporal.TemporalAmount; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -186,7 +185,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { task.setState(newState); - if (newState == TaskState.COMPLETED || newState == TaskState.TERMINATED || newState == TaskState.FAILED) { + if (newState == TaskState.COMPLETED + || newState == TaskState.PAUSED + || newState == TaskState.TERMINATED + || newState == TaskState.FAILED) { LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor()); if (activeTasks.remove(task) != null) { @@ -204,7 +206,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { } task.setHost(null); - host.removeTask(task); + host.delete(task); if (newState == TaskState.COMPLETED) { tasksCompleted++; @@ -214,10 +216,11 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { } if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) { - scheduler.removeTask(task, hv); setTaskToBeRemoved(task); } + scheduler.removeTask(task, hv); + // Try to reschedule if needed requestSchedulingCycle(); } @@ -672,7 +675,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { public ServiceTask newTask( @NotNull String name, @NotNull TaskNature nature, - @NotNull TemporalAmount duration, + @NotNull Duration duration, @NotNull Long deadline, @NotNull ServiceFlavor flavor, @NotNull Workload workload, diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java index cada796a..6d097efb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.service; +import java.time.Duration; import java.time.Instant; -import java.time.temporal.TemporalAmount; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -51,7 +51,7 @@ public class ServiceTask { private final String name; private final TaskNature nature; - private final TemporalAmount duration; + private final Duration duration; private final Long deadline; private ServiceFlavor flavor; public Workload workload; @@ -67,13 +67,14 @@ public class ServiceTask { private SchedulingRequest request = null; private int numFailures = 0; + private int numPauses = 0; ServiceTask( ComputeService service, UUID uid, String name, TaskNature nature, - TemporalAmount duration, + Duration duration, Long deadline, ServiceFlavor flavor, Workload workload, @@ -102,7 +103,7 @@ public class ServiceTask { } @NotNull - public TemporalAmount getDuration() { + public Duration getDuration() { return duration; } @@ -165,6 +166,10 @@ public class ServiceTask { return this.numFailures; } + public int getNumPauses() { + return this.numPauses; + } + public void start() { switch (state) { case PROVISIONING: @@ -182,6 +187,11 @@ public class ServiceTask { assert request == null : "Scheduling request already active"; request = service.schedule(this); break; + case PAUSED: + LOGGER.info("User requested to start task after pause {}", uid); + setState(TaskState.PROVISIONING); + request = service.schedule(this, true); + break; case FAILED: LOGGER.info("User requested to start task after failure {}", uid); setState(TaskState.PROVISIONING); @@ -237,6 +247,8 @@ public class ServiceTask { } if (newState == TaskState.FAILED) { this.numFailures++; + } else if (newState == TaskState.PAUSED) { + this.numPauses++; } if ((newState == TaskState.COMPLETED) || newState == TaskState.FAILED) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index 9c58f7ab..d23794ab 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -71,7 +71,7 @@ public class SimHost( * The virtual machines running on the hypervisor. */ private val taskToGuestMap = HashMap<ServiceTask, Guest>() - private val guests = mutableListOf<Guest>() + private val guests = mutableSetOf<Guest>() private var hostState: HostState = HostState.DOWN set(value) { @@ -151,12 +151,20 @@ public class SimHost( // Fail the guest and delete them // This weird loop is the only way I have been able to make it work. while (guests.size > 0) { - val guest = guests[0] + val guest = guests.first() guest.fail() this.delete(guest.task) } } + public fun pauseAllTasks() { + while (guests.size > 0) { + val guest = guests.first() + guest.pause() + this.delete(guest.task) + } + } + public fun recover() { updateUptime() @@ -194,7 +202,7 @@ public class SimHost( } public fun getGuests(): List<Guest> { - return this.guests + return this.guests.toList() } public fun canFit(task: ServiceTask): Boolean { @@ -239,28 +247,19 @@ public class SimHost( guest.start() } - public fun stop(task: ServiceTask) { - val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } - guest.stop() - } +// public fun stop(task: ServiceTask) { +// val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" } +// guest.stop() +// } public fun delete(task: ServiceTask) { val guest = taskToGuestMap[task] ?: return - guest.delete() taskToGuestMap.remove(task) guests.remove(guest) task.host = null } - public fun removeTask(task: ServiceTask) { - val guest = taskToGuestMap[task] ?: return - guest.delete() - - taskToGuestMap.remove(task) - guests.remove(guest) - } - public fun addListener(listener: HostListener) { hostListeners.add(listener) } @@ -281,16 +280,21 @@ public class SimHost( var invalid = 0 var completed = 0 - val guests = guests.listIterator() for (guest in guests) { when (guest.state) { TaskState.RUNNING -> running++ - TaskState.COMPLETED, TaskState.FAILED, TaskState.TERMINATED -> { + TaskState.FAILED, TaskState.TERMINATED -> { failed++ // Remove guests that have been deleted this.taskToGuestMap.remove(guest.task) - guests.remove() + guests.remove(guest) + } + TaskState.COMPLETED -> { + completed++ + this.taskToGuestMap.remove(guest.task) + guests.remove(guest) } + TaskState.PAUSED -> {} else -> invalid++ } } @@ -366,9 +370,8 @@ public class SimHost( totalDowntime += duration } - val guests = guests - for (i in guests.indices) { - guests[i].updateUptime() + for (guest in guests) { + guest.updateUptime() } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 8096a67a..fe8cbf2f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -70,7 +70,7 @@ public class Guest( */ public fun start() { when (state) { - TaskState.CREATED, TaskState.FAILED -> { + TaskState.CREATED, TaskState.FAILED, TaskState.PAUSED -> { LOGGER.info { "User requested to start task ${task.uid}" } doStart() } @@ -139,17 +139,17 @@ public class Guest( listener.onStart(this) } - /** - * Stop the guest. - */ - public fun stop() { - when (state) { - TaskState.RUNNING -> doStop(TaskState.COMPLETED) - TaskState.FAILED -> state = TaskState.TERMINATED - TaskState.COMPLETED, TaskState.TERMINATED -> return - else -> assert(false) { "Invalid state transition" } - } - } +// /** +// * Stop the guest. +// */ +// public fun stop() { +// when (state) { +// TaskState.RUNNING -> doStop(TaskState.COMPLETED) +// TaskState.FAILED -> state = TaskState.TERMINATED +// TaskState.COMPLETED, TaskState.PAUSED, TaskState.TERMINATED -> return +// else -> assert(false) { "Invalid state transition" } +// } +// } /** * Attempt to stop the task and put it into [target] state. @@ -157,6 +157,7 @@ public class Guest( private fun doStop(target: TaskState) { assert(virtualMachine != null) { "Invalid job state" } val virtualMachine = this.virtualMachine ?: return + this.state = target if (target == TaskState.FAILED) { virtualMachine.stopWorkload(Exception("Task has failed")) } else { @@ -164,8 +165,6 @@ public class Guest( } this.virtualMachine = null - - this.state = target } /** @@ -174,21 +173,24 @@ public class Guest( private fun onStop(target: TaskState) { updateUptime() - state = target + if (state == TaskState.RUNNING) { + // If state is not running, that means state has been changed already and the callback should be ignored + state = target + } listener.onStop(this) } - /** - * Delete the guest. - * - * This operation will stop the guest if it is running on the host and remove all resources associated with the - * guest. - */ - public fun delete() { - stop() - - state = TaskState.FAILED - } +// /** +// * Delete the guest. +// * +// * This operation will stop the guest if it is running on the host and remove all resources associated with the +// * guest. +// */ +// public fun delete() { +// stop() +// +// state = TaskState.FAILED +// } /** * Fail the guest if it is active. @@ -203,6 +205,14 @@ public class Guest( doStop(TaskState.FAILED) } + public fun pause() { + if (state != TaskState.RUNNING) { + return + } + + doStop(TaskState.PAUSED) + } + /** * Recover the guest if it is in an error state. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 1145c15b..a18856f8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -29,7 +29,6 @@ import org.opendc.compute.simulator.telemetry.ComputeMonitor import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec -import org.opendc.simulator.compute.power.CarbonReceiver import java.time.Duration /** @@ -85,8 +84,7 @@ public fun registerComputeMonitor( public fun setupHosts( serviceDomain: String, specs: List<ClusterSpec>, - carbonReceivers: List<CarbonReceiver>, startTime: Long = 0L, ): ProvisioningStep { - return HostsProvisioningStep(serviceDomain, specs, carbonReceivers, startTime) + return HostsProvisioningStep(serviceDomain, specs, startTime) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 75508b8d..675ce3a9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -29,7 +29,6 @@ import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec import org.opendc.compute.topology.specs.createSimBatteryPolicy import org.opendc.simulator.compute.power.CarbonModel -import org.opendc.simulator.compute.power.CarbonReceiver import org.opendc.simulator.compute.power.SimPowerSource import org.opendc.simulator.compute.power.batteries.BatteryAggregator import org.opendc.simulator.compute.power.batteries.SimBattery @@ -47,7 +46,6 @@ import org.opendc.simulator.engine.graph.FlowEdge public class HostsProvisioningStep internal constructor( private val serviceDomain: String, private val clusterSpecs: List<ClusterSpec>, - private val carbonReceivers: List<CarbonReceiver>, private val startTime: Long = 0L, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { @@ -77,11 +75,7 @@ public class HostsProvisioningStep internal constructor( if (carbonFragments != null) { carbonModel = CarbonModel(engine, carbonFragments, startTime) carbonModel.addReceiver(simPowerSource) - for (receiver in carbonReceivers) { - carbonModel.addReceiver(receiver) - } - val computeService = ctx.registry.resolve(serviceDomain, ComputeService::class.java)!! - carbonModel.addReceiver(computeService) + ctx.registry.register(serviceDomain, CarbonModel::class.java, carbonModel) } if (cluster.battery != null) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt index ecf4804a..c11a6db8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt @@ -27,6 +27,7 @@ package org.opendc.compute.simulator.scheduler import org.opendc.compute.simulator.scheduler.filters.ComputeFilter import org.opendc.compute.simulator.scheduler.filters.RamFilter import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher import org.opendc.compute.simulator.scheduler.weights.InstanceCountWeigher import org.opendc.compute.simulator.scheduler.weights.RamWeigher @@ -135,7 +136,6 @@ public fun createPrefabComputeScheduler( weighers = listOf(RamWeigher(multiplier = 1.0)), windowSize = 168, clock = clock, - peakShift = false, random = SplittableRandom(seeder.nextLong()), ) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt new file mode 100644 index 00000000..4ea526b7 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TaskStopper.kt @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.timeshift + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.simulator.compute.power.CarbonModel +import org.opendc.simulator.compute.power.CarbonReceiver +import java.time.InstantSource +import java.util.LinkedList +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToInt + +public class TaskStopper( + private val clock: InstantSource, + context: CoroutineContext, + private val forecast: Boolean = true, + private val forecastThreshold: Double = 0.6, + private val forecastSize: Int = 24, + private val windowSize: Int = 168, +) : CarbonReceiver { + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + private val pastCarbonIntensities = LinkedList<Double>() + private var carbonRunningSum = 0.0 + private var isHighCarbon = false + private var carbonModel: CarbonModel? = null + + private var service: ComputeService? = null + private var client: ComputeService.ComputeClient? = null + + public fun setService(service: ComputeService) { + this.service = service + this.client = service.newClient() + } + + private fun pauseTasks() { + for (host in service!!.hosts) { + val guests = host.getGuests() + + val snapshots = + guests.map { + it.virtualMachine!!.makeSnapshot(clock.millis()) + it.virtualMachine!!.snapshot + } + val tasks = guests.map { it.task } + host.pauseAllTasks() + + for ((task, snapshot) in tasks.zip(snapshots)) { + client!!.rescheduleTask(task, snapshot) + } + } + } + + override fun updateCarbonIntensity(newCarbonIntensity: Double) { + if (!forecast) { + isHighCarbon = noForecastUpdateCarbonIntensity(newCarbonIntensity) + } else { + val forecast = carbonModel!!.getForecast(forecastSize) + val quantileIndex = (forecastSize * forecastThreshold).roundToInt() + val thresholdCarbonIntensity = forecast.sorted()[quantileIndex] + + isHighCarbon = newCarbonIntensity > thresholdCarbonIntensity + } + + if (isHighCarbon) { + scope.launch { + pauseTasks() + } + } + } + + private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double): Boolean { + this.pastCarbonIntensities.addLast(newCarbonIntensity) + this.carbonRunningSum += newCarbonIntensity + if (this.pastCarbonIntensities.size > this.windowSize) { + this.carbonRunningSum -= this.pastCarbonIntensities.removeFirst() + } + + val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size + + isHighCarbon = (newCarbonIntensity > thresholdCarbonIntensity) + return isHighCarbon + } + + override fun setCarbonModel(carbonModel: CarbonModel?) { + this.carbonModel = carbonModel + } + + override fun removeCarbonModel(carbonModel: CarbonModel?) {} +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt index 50081cd6..2f9b4364 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt @@ -20,8 +20,12 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.scheduler +package org.opendc.compute.simulator.scheduler.timeshift +import org.opendc.compute.simulator.scheduler.ComputeScheduler +import org.opendc.compute.simulator.scheduler.SchedulingRequest +import org.opendc.compute.simulator.scheduler.SchedulingResult +import org.opendc.compute.simulator.scheduler.SchedulingResultType import org.opendc.compute.simulator.scheduler.filters.HostFilter import org.opendc.compute.simulator.scheduler.weights.HostWeigher import org.opendc.compute.simulator.service.HostView @@ -34,6 +38,7 @@ import java.util.LinkedList import java.util.SplittableRandom import java.util.random.RandomGenerator import kotlin.math.min +import kotlin.math.roundToInt public class TimeshiftScheduler( private val filters: List<HostFilter>, @@ -41,7 +46,10 @@ public class TimeshiftScheduler( private val windowSize: Int, private val clock: InstantSource, private val subsetSize: Int = 1, - private val peakShift: Boolean = true, + private val forecast: Boolean = true, + private val shortForecastThreshold: Double = 0.2, + private val longForecastThreshold: Double = 0.35, + private val forecastSize: Int = 24, private val random: RandomGenerator = SplittableRandom(0), ) : ComputeScheduler, CarbonReceiver { /** @@ -55,7 +63,9 @@ public class TimeshiftScheduler( private val pastCarbonIntensities = LinkedList<Double>() private var carbonRunningSum = 0.0 - private var isLowCarbon = false + private var shortLowCarbon = false // Low carbon regime for short tasks (< 2 hours) + private var longLowCarbon = false // Low carbon regime for long tasks (>= hours) + private var carbonModel: CarbonModel? = null override fun addHost(host: HostView) { hosts.add(host) @@ -75,8 +85,16 @@ public class TimeshiftScheduler( val task = req.task - if (!isLowCarbon) { - if (task.nature.deferrable) { + /** + If we are not in a low carbon regime, delay tasks. + Only delay tasks if they are deferrable and it doesn't violate the deadline. + Separate delay thresholds for short and long tasks. + */ + if (task.nature.deferrable) { + val durInHours = task.duration.toHours() + if ((durInHours < 2 && !shortLowCarbon) || + (durInHours >= 2 && !longLowCarbon) + ) { val currentTime = clock.instant() val estimatedCompletion = currentTime.plus(task.duration) val deadline = Instant.ofEpochMilli(task.deadline) @@ -142,7 +160,32 @@ public class TimeshiftScheduler( host: HostView?, ) {} + /** + Compare current carbon intensity to the chosen quantile from the [forecastSize] + number of intensity forecasts + */ override fun updateCarbonIntensity(newCarbonIntensity: Double) { + if (!forecast) { + noForecastUpdateCarbonIntensity(newCarbonIntensity) + return + } + + val forecast = carbonModel!!.getForecast(forecastSize) + val forecastSize = forecast.size + val shortQuantileIndex = (forecastSize * shortForecastThreshold).roundToInt() + val shortCarbonIntensity = forecast.sorted()[shortQuantileIndex] + val longQuantileIndex = (forecastSize * longForecastThreshold).roundToInt() + val longCarbonIntensity = forecast.sorted()[longQuantileIndex] + + shortLowCarbon = newCarbonIntensity < shortCarbonIntensity + longLowCarbon = newCarbonIntensity < longCarbonIntensity + } + + /** + Compare current carbon intensity to the moving average of the past [windowSize] + number of intensity updates + */ + private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double) { val previousCarbonIntensity = if (this.pastCarbonIntensities.isEmpty()) { 0.0 @@ -157,22 +200,14 @@ public class TimeshiftScheduler( val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size - if (!peakShift) { - isLowCarbon = newCarbonIntensity < thresholdCarbonIntensity - return - } - - isLowCarbon = ( - (newCarbonIntensity < thresholdCarbonIntensity) && - (newCarbonIntensity > previousCarbonIntensity) - ) || - ( - (newCarbonIntensity < 1.2 * thresholdCarbonIntensity) && - isLowCarbon - ) + shortLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) && + (newCarbonIntensity > previousCarbonIntensity) + longLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) } - override fun setCarbonModel(carbonModel: CarbonModel?) {} + override fun setCarbonModel(carbonModel: CarbonModel?) { + this.carbonModel = carbonModel + } override fun removeCarbonModel(carbonModel: CarbonModel?) {} } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt index 2f9633db..6f6b5bdd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt @@ -134,6 +134,11 @@ public object DfltTaskExportColumns { field = Types.required(INT64).named("num_failures"), ) { it.numFailures } + public val NUM_PAUSES: ExportColumn<TaskTableReader> = + ExportColumn( + field = Types.required(INT64).named("num_pauses"), + ) { it.numPauses } + public val SCHEDULE_TIME: ExportColumn<TaskTableReader> = ExportColumn( field = Types.optional(INT64).named("schedule_time"), diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt index 79c8e4a6..771ced37 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt @@ -75,6 +75,8 @@ public interface TaskTableReader : Exportable { */ public val numFailures: Int + public val numPauses: Int + /** * The [Instant] at which the task was scheduled relative to the start of the workload. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt index be8d5725..881b9916 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt @@ -65,6 +65,7 @@ public class TaskTableReaderImpl( _uptime = table.uptime _downtime = table.downtime _numFailures = table.numFailures + _numPauses = table.numPauses _scheduleTime = table.scheduleTime _submissionTime = table.submissionTime @@ -114,6 +115,10 @@ public class TaskTableReaderImpl( get() = _numFailures private var _numFailures = 0 + override val numPauses: Int + get() = _numPauses + private var _numPauses = 0 + override val submissionTime: Instant? get() = _submissionTime private var _submissionTime: Instant? = null @@ -197,6 +202,7 @@ public class TaskTableReaderImpl( _downtime = sysStats?.downtime?.toMillis() ?: _downtime _numFailures = task.numFailures + _numPauses = task.numPauses _submissionTime = task.submittedAt _scheduleTime = task.scheduledAt _finishTime = task.finishedAt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt index b893c1aa..46c6425e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/scheduler/TimeshiftSchedulerTest.kt @@ -26,6 +26,7 @@ import io.mockk.every import io.mockk.mockk import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.scheduler.timeshift.TimeshiftScheduler import org.opendc.compute.simulator.service.TaskNature import java.time.Duration import java.time.Instant @@ -43,6 +44,7 @@ class TimeshiftSchedulerTest { weighers = emptyList(), windowSize = 2, clock = clock, + forecast = false, ) val req = mockk<SchedulingRequest>() @@ -70,6 +72,7 @@ class TimeshiftSchedulerTest { weighers = emptyList(), windowSize = 2, clock = clock, + forecast = false, ) val req = mockk<SchedulingRequest>() |
