diff options
| author | Sacheendra Talluri <sacheendra.t@gmail.com> | 2025-01-16 15:53:15 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-16 15:53:15 +0100 |
| commit | 1fc201745b1984db492350ab5b4e11d2a3363aa5 (patch) | |
| tree | 100f8da878806f676e080027057c5ad136a2c8ea /opendc-compute/opendc-compute-simulator/src/main/kotlin | |
| parent | 39aeb8e07d640fee5e3ba1b4d64eb3a3a964648b (diff) | |
Add support for schedulers which can receive task state change updates (#290)
* Change scheduler API to include task removal and add tests
* Check if memorizing schduler works with the whole system
* Spotless apply
* Expand function name and improve documentation
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src/main/kotlin')
7 files changed, 227 insertions, 80 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt index f0a2c3b4..f702ace9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeScheduler.kt @@ -40,10 +40,43 @@ public interface ComputeScheduler { public fun removeHost(host: HostView) /** - * Select a host for the specified [task]. + * Select a host for the specified [iter]. + * We implicity assume that the task has been scheduled onto the host. * - * @param task The server to select a host for. + * @param iter The server to select a host for. * @return The host to schedule the server on or `null` if no server is available. */ - public fun select(task: ServiceTask): HostView? + public fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult + + /** + * Inform the scheduler that a [task] has been removed from the [host]. + * Could be due to completion or failure. + */ + public fun removeTask( + task: ServiceTask, + host: HostView?, + ) +} + +/** + * A request to schedule a [ServiceTask] onto one of the [SimHost]s. + */ +public data class SchedulingRequest internal constructor( + public val task: ServiceTask, + public val submitTime: Long, +) { + public var isCancelled: Boolean = false + public var timesSkipped: Int = 0 } + +public enum class SchedulingResultType { + SUCCESS, + FAILURE, + EMPTY, +} + +public data class SchedulingResult( + val resultType: SchedulingResultType, + val host: HostView? = null, + val req: SchedulingRequest? = null, +) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt index ec3aedcb..7f4f2f07 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ComputeSchedulers.kt @@ -44,15 +44,13 @@ public enum class ComputeSchedulerEnum { ProvisionedCores, ProvisionedCoresInv, Random, - Replay, } public fun createComputeScheduler( name: String, seeder: RandomGenerator, - placements: Map<String, String> = emptyMap(), ): ComputeScheduler { - return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder, placements) + return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder) } /** @@ -61,7 +59,6 @@ public fun createComputeScheduler( public fun createComputeScheduler( name: ComputeSchedulerEnum, seeder: RandomGenerator, - placements: Map<String, String> = emptyMap(), ): ComputeScheduler { val cpuAllocationRatio = 1.0 val ramAllocationRatio = 1.5 @@ -113,6 +110,5 @@ public fun createComputeScheduler( subsetSize = Int.MAX_VALUE, random = SplittableRandom(seeder.nextLong()), ) - ComputeSchedulerEnum.Replay -> ReplayScheduler(placements) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt index 9fd3a862..832482eb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt @@ -65,7 +65,20 @@ public class FilterScheduler( hosts.remove(host) } - override fun select(task: ServiceTask): HostView? { + override fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult { + var req = iter.next() + + while (req.isCancelled) { + iter.remove() + if (iter.hasNext()) { + req = iter.next() + } else { + // No tasks in queue + return SchedulingResult(SchedulingResultType.EMPTY) + } + } + + val task = req.task val hosts = hosts val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } } @@ -102,10 +115,18 @@ public class FilterScheduler( } // fixme: currently finding no matching hosts can result in an error - return when (val maxSize = min(subsetSize, subset.size)) { - 0 -> null - 1 -> subset[0] - else -> subset[random.nextInt(maxSize)] + val maxSize = min(subsetSize, subset.size) + if (maxSize == 0) { + return SchedulingResult(SchedulingResultType.FAILURE, null, req) + } else { + iter.remove() + return SchedulingResult(SchedulingResultType.SUCCESS, subset[random.nextInt(maxSize)], req) } } + + override fun removeTask( + task: ServiceTask, + host: HostView?, + ) { + } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt new file mode 100644 index 00000000..d3b590f7 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/MemorizingScheduler.kt @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.scheduler + +import org.opendc.compute.simulator.scheduler.filters.HostFilter +import org.opendc.compute.simulator.service.HostView +import org.opendc.compute.simulator.service.ServiceTask +import java.util.SplittableRandom +import java.util.random.RandomGenerator + +/* +This scheduler records the number of tasks scheduled on each host. +When scheduling a new task, it assign the next task to the host with the least number of tasks. +We filter hosts to check if the specific task can actually run on the host. + */ +public class MemorizingScheduler( + private val filters: List<HostFilter>, + private val random: RandomGenerator = SplittableRandom(0), + private val maxTimesSkipped: Int = 7, +) : ComputeScheduler { + // We assume that there will be max 200 tasks per host. + // The index of a host list is the number of tasks on that host. + private val hostsQueue = List(200, { mutableListOf<HostView>() }) + private var minAvailableHost = 0 + private var numHosts = 0 + + override fun addHost(host: HostView) { + val zeroQueue = hostsQueue[0] + zeroQueue.add(host) + host.priorityIndex = 0 + host.listIndex = zeroQueue.size - 1 + numHosts++ + minAvailableHost = 0 + } + + override fun removeHost(host: HostView) { + val priorityIdx = host.priorityIndex + val listIdx = host.listIndex + val chosenList = hostsQueue[priorityIdx] + + if (listIdx == chosenList.size - 1) { + chosenList.removeLast() + if (listIdx == minAvailableHost) { + for (i in minAvailableHost + 1..hostsQueue.lastIndex) { + if (hostsQueue[i].size > 0) { + minAvailableHost = i + break + } + } + } + } else { + val lastItem = chosenList.removeLast() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + } + numHosts-- + } + + override fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult { + if (numHosts == 0) { + return SchedulingResult(SchedulingResultType.FAILURE) + } + + var chosenList: MutableList<HostView>? = null + var chosenHost: HostView? = null + + var result: SchedulingResult? = null + taskloop@ for (req in iter) { + if (req.isCancelled) { + iter.remove() + } + + for (chosenListIndex in minAvailableHost until hostsQueue.size) { + chosenList = hostsQueue[chosenListIndex] + + for (host in chosenList) { + val satisfied = filters.all { filter -> filter.test(host, req.task) } + if (satisfied) { + iter.remove() + chosenHost = host + result = SchedulingResult(SchedulingResultType.SUCCESS, host, req) + break@taskloop + } else if (req.timesSkipped >= maxTimesSkipped) { + return SchedulingResult(SchedulingResultType.FAILURE, null, req) + } + } + } + req.timesSkipped++ + } + + if (result == null) return SchedulingResult(SchedulingResultType.EMPTY) // No tasks to schedule that fit + + // Bookkeeping to maintain the calendar priority queue + val listIdx = chosenHost!!.listIndex + + if (listIdx == chosenList!!.size - 1) { + chosenList.removeLast() + if (chosenList.isEmpty()) minAvailableHost++ + } else { + val lastItem = chosenList.removeLast() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + } + + val nextList = hostsQueue[chosenHost.priorityIndex + 1] + nextList.add(chosenHost) + chosenHost.priorityIndex++ + chosenHost.listIndex = nextList.size - 1 + + return result + } + + override fun removeTask( + task: ServiceTask, + host: HostView?, + ) { + if (host == null) return + + val priorityIdx = host.priorityIndex + val listIdx = host.listIndex + val chosenList = hostsQueue[priorityIdx] + val nextList = hostsQueue[priorityIdx - 1] + + if (listIdx == chosenList.size - 1) { + chosenList.removeLast() + if (priorityIdx == minAvailableHost) { + minAvailableHost-- + } + } else { + val lastItem = chosenList.removeLast() + chosenList[listIdx] = lastItem + lastItem.listIndex = listIdx + } + nextList.add(host) + host.priorityIndex-- + host.listIndex = nextList.size - 1 + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt deleted file mode 100644 index 43e366d9..00000000 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/ReplayScheduler.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator.scheduler - -import mu.KotlinLogging -import org.opendc.compute.simulator.service.HostView -import org.opendc.compute.simulator.service.ServiceTask - -/** - * Policy replaying VM-cluster assignment. - * - * Within each cluster, the active servers on each node determine which node gets - * assigned the VM image. - */ -public class ReplayScheduler(private val vmPlacements: Map<String, String>) : ComputeScheduler { - private val logger = KotlinLogging.logger {} - - /** - * The pool of hosts available to the scheduler. - */ - private val hosts = mutableListOf<HostView>() - - override fun addHost(host: HostView) { - hosts.add(host) - } - - override fun removeHost(host: HostView) { - hosts.remove(host) - } - - override fun select(task: ServiceTask): HostView? { - val clusterName = - vmPlacements[task.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${task.name}") - val machinesInCluster = hosts.filter { it.host.getName().contains(clusterName) } - - if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${task.name}, assigning randomly." } - return hosts.maxByOrNull { it.availableMemory } - } - - return machinesInCluster.maxByOrNull { it.availableMemory } - ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign") - } -} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt index 256caa94..4e63baf4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/filters/VCpuCapacityFilter.kt @@ -37,6 +37,10 @@ public class VCpuCapacityFilter : HostFilter { val requiredCapacity = task.flavor.meta["cpu-capacity"] as? Double val availableCapacity = host.host.getModel().cpuCapacity - return requiredCapacity == null || availableCapacity >= (requiredCapacity / task.flavor.coreCount) + return ( + requiredCapacity == null || + (availableCapacity / host.host.getModel().coreCount) + >= (requiredCapacity / task.flavor.coreCount) + ) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt index b6c43c10..aa6fdf3b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/weights/CoreRamWeigher.kt @@ -37,7 +37,7 @@ public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher host: HostView, task: ServiceTask, ): Double { - return host.availableMemory.toDouble() + return multiplier * (host.availableMemory.toDouble() / host.host.getModel().coreCount) } override fun toString(): String = "CoreRamWeigher" |
