diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src/main')
7 files changed, 323 insertions, 72 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index fb9dc6e6..6d973b3f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -436,6 +436,7 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { taskQueue.add(request); } tasksPending++; + requestSchedulingCycle(); return request; } @@ -484,7 +485,6 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { iterator = taskQueue.iterator()) { final SchedulingResult result = scheduler.select(iterator); if (result.getResultType() == SchedulingResultType.EMPTY) { - break; } final HostView hv = result.getHost(); diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java index 6d097efb..66b69bde 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -251,7 +251,7 @@ public class ServiceTask { this.numPauses++; } - if ((newState == TaskState.COMPLETED) || newState == TaskState.FAILED) { + if ((newState == TaskState.COMPLETED) || (newState == TaskState.FAILED) || (newState == TaskState.TERMINATED)) { this.finishedAt = this.service.getClock().instant(); } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt index c11a6db8..35c66e44 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt @@ -120,7 +120,6 @@ public fun createPrefabComputeScheduler( ComputeSchedulerEnum.TaskNumMemorizing -> MemorizingScheduler( filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - random = SplittableRandom(seeder.nextLong()), ) ComputeSchedulerEnum.Timeshift -> TimeshiftScheduler( diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt index 25e559fd..4108ed3d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt @@ -25,8 +25,6 @@ package org.opendc.compute.simulator.scheduler import org.opendc.compute.simulator.scheduler.filters.HostFilter import org.opendc.compute.simulator.service.HostView import org.opendc.compute.simulator.service.ServiceTask -import java.util.SplittableRandom -import java.util.random.RandomGenerator /* This scheduler records the number of tasks scheduled on each host. @@ -35,7 +33,6 @@ We filter hosts to check if the specific task can actually run on the host. */ public class MemorizingScheduler( private val filters: List<HostFilter>, - private val random: RandomGenerator = SplittableRandom(0), private val maxTimesSkipped: Int = 7, ) : ComputeScheduler { // We assume that there will be max 200 tasks per host. @@ -81,6 +78,9 @@ public class MemorizingScheduler( return SchedulingResult(SchedulingResultType.FAILURE) } + val maxIters = 10000 + var numIters = 0 + var chosenList: MutableList<HostView>? = null var chosenHost: HostView? = null @@ -91,6 +91,11 @@ public class MemorizingScheduler( continue } + numIters++ + if (numIters > maxIters) { + return SchedulingResult(SchedulingResultType.EMPTY) + } + for (chosenListIndex in minAvailableHost until hostsQueue.size) { chosenList = hostsQueue[chosenListIndex] diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/MemorizingTimeshift.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/MemorizingTimeshift.kt new file mode 100644 index 00000000..443f1f0e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/MemorizingTimeshift.kt @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.timeshift + +import org.opendc.compute.simulator.scheduler.ComputeScheduler +import org.opendc.compute.simulator.scheduler.SchedulingRequest +import org.opendc.compute.simulator.scheduler.SchedulingResult +import org.opendc.compute.simulator.scheduler.SchedulingResultType +import org.opendc.compute.simulator.scheduler.filters.HostFilter +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.simulator.compute.power.CarbonModel +import java.time.Instant +import java.time.InstantSource +import java.util.LinkedList + +public class MemorizingTimeshift( + private val filters: List<HostFilter>, + override val windowSize: Int, + override val clock: InstantSource, + override val forecast: Boolean = true, + override val shortForecastThreshold: Double = 0.2, + override val longForecastThreshold: Double = 0.35, + override val forecastSize: Int = 24, + public val maxTimesSkipped: Int = 7, +) : ComputeScheduler, Timeshifter { + // We assume that there will be max 200 tasks per host. + // The index of a host list is the number of tasks on that host. + private val hostsQueue = List(100) { mutableListOf<HostView>() } + private var minAvailableHost = 0 + private var numHosts = 0 + + override val pastCarbonIntensities: LinkedList<Double> = LinkedList<Double>() + override var carbonRunningSum: Double = 0.0 + override var shortLowCarbon: Boolean = false // Low carbon regime for short tasks (< 2 hours) + override var longLowCarbon: Boolean = false // Low carbon regime for long tasks (>= hours) + override var carbonMod: CarbonModel? = null + + override fun addHost(host: HostView) { + val zeroQueue = hostsQueue[0] + zeroQueue.add(host) + host.priorityIndex = 0 + host.listIndex = zeroQueue.size - 1 + numHosts++ + minAvailableHost = 0 + } + + override fun removeHost(host: HostView) { + val priorityIdx = host.priorityIndex + val listIdx = host.listIndex + val chosenList = hostsQueue[priorityIdx] + + if (chosenList.size == 1) { + chosenList.removeLast() + if (listIdx == minAvailableHost) { + for (i in minAvailableHost + 1..hostsQueue.lastIndex) { + if (hostsQueue[i].size > 0) { + minAvailableHost = i + break + } + } + } + } else { + val lastItem = chosenList.removeLast() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + } + numHosts-- + } + + override fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult { + if (numHosts == 0) { + return SchedulingResult(SchedulingResultType.FAILURE) + } + + val maxIters = 10000 + var numIters = 0 + + var chosenList: MutableList<HostView>? = null + var chosenHost: HostView? = null + + var result: SchedulingResult? = null + taskloop@ for (req in iter) { + if (req.isCancelled) { + iter.remove() + continue + } + + numIters++ + if (numIters > maxIters) { + return SchedulingResult(SchedulingResultType.EMPTY) + } + + val task = req.task + + /** + If we are not in a low carbon regime, delay tasks. + Only delay tasks if they are deferrable and it doesn't violate the deadline. + Separate delay thresholds for short and long tasks. + */ + if (task.nature.deferrable) { + val durInHours = task.duration.toHours() + if ((durInHours < 2 && !shortLowCarbon) || + (durInHours >= 2 && !longLowCarbon) + ) { + val currentTime = clock.instant() + val estimatedCompletion = currentTime.plus(task.duration) + val deadline = Instant.ofEpochMilli(task.deadline) + if (estimatedCompletion.isBefore(deadline)) { + // No need to schedule this task in a high carbon intensity period + continue + } + } + } + + for (chosenListIndex in minAvailableHost until hostsQueue.size) { + chosenList = hostsQueue[chosenListIndex] + + for (host in chosenList) { + val satisfied = filters.all { filter -> filter.test(host, req.task) } + if (satisfied) { + iter.remove() + chosenHost = host + result = SchedulingResult(SchedulingResultType.SUCCESS, host, req) + break@taskloop + } else if (req.timesSkipped >= maxTimesSkipped) { + return SchedulingResult(SchedulingResultType.FAILURE, null, req) + } + } + } + req.timesSkipped++ + } + + if (result == null) return SchedulingResult(SchedulingResultType.EMPTY) // No tasks to schedule that fit + + // Bookkeeping to maintain the calendar priority queue + if (chosenList!!.size == 1) { + chosenList.removeLast() + minAvailableHost++ + } else { + val listIdx = chosenHost!!.listIndex + // Not using removeLast here as it would cause problems during swapping + // if chosenHost is lastItem + val lastItem = chosenList.last() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + chosenList.removeLast() + } + + val nextList = hostsQueue[chosenHost!!.priorityIndex + 1] + nextList.add(chosenHost) + chosenHost.priorityIndex++ + chosenHost.listIndex = nextList.size - 1 + + return result + } + + override fun removeTask( + task: ServiceTask, + host: HostView?, + ) { + if (host == null) return + + val priorityIdx = host.priorityIndex + val listIdx = host.listIndex + val chosenList = hostsQueue[priorityIdx] + val nextList = hostsQueue[priorityIdx - 1] + + if (chosenList.size == 1) { + chosenList.removeLast() + } else { + val lastItem = chosenList.last() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + chosenList.removeLast() + } + + nextList.add(host) + host.priorityIndex-- + host.listIndex = nextList.size - 1 + if (priorityIdx == minAvailableHost) { + minAvailableHost-- + } + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt index f402c5a5..970b7761 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/TimeshiftScheduler.kt @@ -31,27 +31,25 @@ import org.opendc.compute.simulator.scheduler.weights.HostWeigher import org.opendc.compute.simulator.service.HostView import org.opendc.compute.simulator.service.ServiceTask import org.opendc.simulator.compute.power.CarbonModel -import org.opendc.simulator.compute.power.CarbonReceiver import java.time.Instant import java.time.InstantSource import java.util.LinkedList import java.util.SplittableRandom import java.util.random.RandomGenerator import kotlin.math.min -import kotlin.math.roundToInt public class TimeshiftScheduler( private val filters: List<HostFilter>, private val weighers: List<HostWeigher>, - private val windowSize: Int, - private val clock: InstantSource, + override val windowSize: Int, + override val clock: InstantSource, private val subsetSize: Int = 1, - private val forecast: Boolean = true, - private val shortForecastThreshold: Double = 0.2, - private val longForecastThreshold: Double = 0.35, - private val forecastSize: Int = 24, + override val forecast: Boolean = true, + override val shortForecastThreshold: Double = 0.2, + override val longForecastThreshold: Double = 0.35, + override val forecastSize: Int = 24, private val random: RandomGenerator = SplittableRandom(0), -) : ComputeScheduler, CarbonReceiver { +) : ComputeScheduler, Timeshifter { /** * The pool of hosts available to the scheduler. */ @@ -61,11 +59,11 @@ public class TimeshiftScheduler( require(subsetSize >= 1) { "Subset size must be one or greater" } } - private val pastCarbonIntensities = LinkedList<Double>() - private var carbonRunningSum = 0.0 - private var shortLowCarbon = false // Low carbon regime for short tasks (< 2 hours) - private var longLowCarbon = false // Low carbon regime for long tasks (>= hours) - private var carbonModel: CarbonModel? = null + override val pastCarbonIntensities: LinkedList<Double> = LinkedList<Double>() + override var carbonRunningSum: Double = 0.0 + override var shortLowCarbon: Boolean = false // Low carbon regime for short tasks (< 2 hours) + override var longLowCarbon: Boolean = false // Low carbon regime for long tasks (>= hours) + override var carbonMod: CarbonModel? = null override fun addHost(host: HostView) { hosts.add(host) @@ -159,56 +157,4 @@ public class TimeshiftScheduler( task: ServiceTask, host: HostView?, ) {} - - /** - Compare current carbon intensity to the chosen quantile from the [forecastSize] - number of intensity forecasts - */ - override fun updateCarbonIntensity(newCarbonIntensity: Double) { - if (!forecast) { - noForecastUpdateCarbonIntensity(newCarbonIntensity) - return - } - - val forecast = carbonModel!!.getForecast(forecastSize) - val localForecastSize = forecast.size - - val shortQuantileIndex = (localForecastSize * shortForecastThreshold).roundToInt() - val shortCarbonIntensity = forecast.sorted()[shortQuantileIndex] - val longQuantileIndex = (localForecastSize * longForecastThreshold).roundToInt() - val longCarbonIntensity = forecast.sorted()[longQuantileIndex] - - shortLowCarbon = newCarbonIntensity < shortCarbonIntensity - longLowCarbon = newCarbonIntensity < longCarbonIntensity - } - - /** - Compare current carbon intensity to the moving average of the past [windowSize] - number of intensity updates - */ - private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double) { - val previousCarbonIntensity = - if (this.pastCarbonIntensities.isEmpty()) { - 0.0 - } else { - this.pastCarbonIntensities.last() - } - this.pastCarbonIntensities.addLast(newCarbonIntensity) - this.carbonRunningSum += newCarbonIntensity - if (this.pastCarbonIntensities.size > this.windowSize) { - this.carbonRunningSum -= this.pastCarbonIntensities.removeFirst() - } - - val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size - - shortLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) && - (newCarbonIntensity > previousCarbonIntensity) - longLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) - } - - override fun setCarbonModel(carbonModel: CarbonModel?) { - this.carbonModel = carbonModel - } - - override fun removeCarbonModel(carbonModel: CarbonModel?) {} } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/Timeshifter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/Timeshifter.kt new file mode 100644 index 00000000..55754d7a --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/timeshift/Timeshifter.kt @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler.timeshift + +import org.opendc.simulator.compute.power.CarbonModel +import org.opendc.simulator.compute.power.CarbonReceiver +import java.time.InstantSource +import java.util.LinkedList +import kotlin.math.roundToInt + +public interface Timeshifter : CarbonReceiver { + public val windowSize: Int + public val clock: InstantSource + public val forecast: Boolean + public val shortForecastThreshold: Double + public val longForecastThreshold: Double + public val forecastSize: Int + + public val pastCarbonIntensities: LinkedList<Double> + public var carbonRunningSum: Double + public var shortLowCarbon: Boolean // Low carbon regime for short tasks (< 2 hours) + public var longLowCarbon: Boolean // Low carbon regime for long tasks (>= hours) + public var carbonMod: CarbonModel? + + /** + Compare current carbon intensity to the chosen quantile from the [forecastSize] + number of intensity forecasts + */ + override fun updateCarbonIntensity(newCarbonIntensity: Double) { + if (!forecast) { + noForecastUpdateCarbonIntensity(newCarbonIntensity) + return + } + + val forecast = carbonMod!!.getForecast(forecastSize) + val localForecastSize = forecast.size + + val shortQuantileIndex = (localForecastSize * shortForecastThreshold).roundToInt() + val shortCarbonIntensity = forecast.sorted()[shortQuantileIndex] + val longQuantileIndex = (localForecastSize * longForecastThreshold).roundToInt() + val longCarbonIntensity = forecast.sorted()[longQuantileIndex] + + shortLowCarbon = newCarbonIntensity < shortCarbonIntensity + longLowCarbon = newCarbonIntensity < longCarbonIntensity + } + + /** + Compare current carbon intensity to the moving average of the past [windowSize] + number of intensity updates + */ + private fun noForecastUpdateCarbonIntensity(newCarbonIntensity: Double) { + val previousCarbonIntensity = + if (this.pastCarbonIntensities.isEmpty()) { + 0.0 + } else { + this.pastCarbonIntensities.last() + } + this.pastCarbonIntensities.addLast(newCarbonIntensity) + this.carbonRunningSum += newCarbonIntensity + if (this.pastCarbonIntensities.size > this.windowSize) { + this.carbonRunningSum -= this.pastCarbonIntensities.removeFirst() + } + + val thresholdCarbonIntensity = this.carbonRunningSum / this.pastCarbonIntensities.size + + shortLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) && + (newCarbonIntensity > previousCarbonIntensity) + longLowCarbon = (newCarbonIntensity < thresholdCarbonIntensity) + } + + override fun setCarbonModel(carbonModel: CarbonModel?) { + this.carbonMod = carbonModel + } + + override fun removeCarbonModel(carbonModel: CarbonModel?) {} +} |
