From cc9310efad6177909ff2f7415384d7c393383106 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 5 May 2021 23:14:56 +0200 Subject: chore: Address deprecations due to Kotlin 1.5 This change addresses the deprecations that were caused by the migration to Kotlin 1.5. --- .../main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 0fd5b2a4..1fe90454 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -59,7 +59,7 @@ public class FilterScheduler(private val filters: List, private val true } .sortedByDescending { host -> - weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor } + weighers.sumOf { (weigher, factor) -> weigher.getWeight(host, server) * factor } } .firstOrNull() } -- cgit v1.2.3 From ce2d730159ae24c7cebb22ec42d094fe5fdc888f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 13 Aug 2021 17:14:53 +0200 Subject: build: Update Kotlin dependencies This change updates the Kotlin dependencies used by OpenDC to their latest version. --- .../opendc/compute/service/ComputeServiceTest.kt | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index a6258845..7817c473 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -167,7 +167,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.ERROR, server.state) } @@ -180,7 +180,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.ERROR, server.state) } @@ -193,7 +193,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.ERROR, server.state) } @@ -207,7 +207,7 @@ internal class ComputeServiceTest { server.start() server.stop() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.TERMINATED, server.state) } @@ -228,7 +228,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(10 * 60 * 1000) + delay(10L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -254,12 +254,12 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.UP listeners.forEach { it.onStateChanged(host, HostState.UP) } - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -284,13 +284,13 @@ internal class ComputeServiceTest { val image = client.newImage("test") val server = client.newServer("test", image, flavor, start = false) - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.DOWN listeners.forEach { it.onStateChanged(host, HostState.DOWN) } server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -344,7 +344,7 @@ internal class ComputeServiceTest { // Start server server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) coVerify { host.spawn(capture(slot), true) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -383,7 +383,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) -- cgit v1.2.3 From c1988fa1b08011f716194f48da10386a236ffd7f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 22 Aug 2021 13:23:17 +0200 Subject: fix(compute): Track failed servers with counters correctly --- .../kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt | 2 ++ 1 file changed, 2 insertions(+) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 8af5f86e..e7807177 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -477,6 +477,8 @@ internal class ComputeServiceImpl( if (newState == ServerState.RUNNING) { _runningServers.add(1) + } else if (newState == ServerState.ERROR) { + _runningServers.add(-1) } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } -- cgit v1.2.3 From b8f64c1d3df2c990df8941cd036222fab2def9fa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 22 Aug 2021 13:23:53 +0200 Subject: refactor(compute): Update FilterScheduler to follow OpenStack's Nova This change updates the FilterScheduler implementation to follow more closely the scheduler implementation in OpenStack's Nova. We now normalize the weights, support many of the filters and weights in OpenStack and support overcommitting resources. --- .../compute/service/scheduler/FilterScheduler.kt | 64 +++- .../scheduler/filters/ComputeCapabilitiesFilter.kt | 40 -- .../compute/service/scheduler/filters/RamFilter.kt | 50 +++ .../service/scheduler/filters/VCpuFilter.kt | 47 +++ .../service/scheduler/weights/CoreMemoryWeigher.kt | 37 -- .../service/scheduler/weights/CoreRamWeigher.kt | 41 +++ .../service/scheduler/weights/HostWeigher.kt | 40 +- .../scheduler/weights/InstanceCountWeigher.kt | 2 +- .../service/scheduler/weights/MemoryWeigher.kt | 37 -- .../scheduler/weights/ProvisionedCoresWeigher.kt | 37 -- .../service/scheduler/weights/RamWeigher.kt | 40 ++ .../service/scheduler/weights/RandomWeigher.kt | 36 -- .../service/scheduler/weights/VCpuWeigher.kt | 44 +++ .../opendc/compute/service/ComputeServiceTest.kt | 9 +- .../service/scheduler/FilterSchedulerTest.kt | 407 +++++++++++++++++++++ 15 files changed, 727 insertions(+), 204 deletions(-) delete mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt delete mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt delete mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt delete mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt delete mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt create mode 100644 opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 1fe90454..8c2d4715 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -26,6 +26,8 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView import org.opendc.compute.service.scheduler.filters.HostFilter import org.opendc.compute.service.scheduler.weights.HostWeigher +import java.util.* +import kotlin.math.min /** * A [ComputeScheduler] implementation that uses filtering and weighing passes to select @@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher * * This implementation is based on the filter scheduler from OpenStack Nova. * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html + * + * @param filters The list of filters to apply when searching for an appropriate host. + * @param weighers The list of weighers to apply when searching for an appropriate host. + * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen. + * @param random A [Random] instance for selecting */ -public class FilterScheduler(private val filters: List, private val weighers: List>) : ComputeScheduler { +public class FilterScheduler( + private val filters: List, + private val weighers: List, + private val subsetSize: Int = 1, + private val random: Random = Random(0) +) : ComputeScheduler { /** * The pool of hosts available to the scheduler. */ private val hosts = mutableListOf() + init { + require(subsetSize >= 1) { "Subset size must be one or greater" } + } + override fun addHost(host: HostView) { hosts.add(host) } @@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List, private val } override fun select(server: Server): HostView? { - return hosts.asSequence() - .filter { host -> - for (filter in filters) { - if (!filter.test(host, server)) - return@filter false + val hosts = hosts + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } } + + val subset = if (weighers.isNotEmpty()) { + val results = weighers.map { it.getWeights(filteredHosts, server) } + val weights = DoubleArray(filteredHosts.size) + + for (result in results) { + val min = result.min + val range = (result.max - min) + + // Skip result if all weights are the same + if (range == 0.0) { + continue } - true - } - .sortedByDescending { host -> - weighers.sumOf { (weigher, factor) -> weigher.getWeight(host, server) * factor } + val multiplier = result.multiplier + val factor = multiplier / range + + for ((i, weight) in result.weights.withIndex()) { + weights[i] += factor * (weight - min) + } } - .firstOrNull() + + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } + + return when (val maxSize = min(subsetSize, subset.size)) { + 0 -> null + 1 -> subset[0] + else -> subset[random.nextInt(maxSize)] + } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt deleted file mode 100644 index 072440c5..00000000 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.scheduler.filters - -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView - -/** - * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server - * flavor. - */ -public class ComputeCapabilitiesFilter : HostFilter { - override fun test(host: HostView, server: Server): Boolean { - val fitsMemory = host.availableMemory >= server.flavor.memorySize - val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount - return fitsMemory && fitsCpu - } - - override fun toString(): String = "ComputeCapabilitiesFilter" -} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt new file mode 100644 index 00000000..a470a453 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host. + * + * @param allocationRatio Virtual RAM to physical RAM allocation ratio. + */ +public class RamFilter(private val allocationRatio: Double) : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requested = server.flavor.memorySize + val available = host.availableMemory + val total = host.host.model.memorySize + + // Do not allow an instance to overcommit against itself, only against + // other instances. + if (requested > total) { + return false + } + + val limit = total * allocationRatio + val used = total - available + val usable = limit - used + return usable >= requested + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt new file mode 100644 index 00000000..abdd79f1 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. + */ +public class VCpuFilter(private val allocationRatio: Double) : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requested = server.flavor.cpuCount + val total = host.host.model.cpuCount + val limit = total * allocationRatio + + // Do not allow an instance to overcommit against itself, only against other instances + if (requested > total) { + return false + } + + val free = limit - host.provisionedCores + return free >= requested + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt deleted file mode 100644 index 12e6510e..00000000 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.scheduler.weights - -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView - -/** - * A [HostWeigher] that weighs the hosts based on the available memory per core on the host. - */ -public class CoreMemoryWeigher : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { - return host.availableMemory.toDouble() / host.host.model.cpuCount - } - - override fun toString(): String = "CoreMemoryWeigher" -} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt new file mode 100644 index 00000000..d668fdaf --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.weights + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostWeigher] that weighs the hosts based on the available memory per core on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available core memory, and a negative number will result in the scheduler preferring hosts with less available core + * memory. + */ +public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight(host: HostView, server: Server): Double { + return host.availableMemory.toDouble() / host.host.model.cpuCount + } + + override fun toString(): String = "CoreRamWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt index d48ee9e0..aca8c4e6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt @@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler /** * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request. */ -public fun interface HostWeigher { +public interface HostWeigher { + /** + * The multiplier for the weigher. + */ + public val multiplier: Double + /** * Obtain the weight of the specified [host] when scheduling the specified [server]. */ public fun getWeight(host: HostView, server: Server): Double + + /** + * Obtain the weights for [hosts] when scheduling the specified [server]. + */ + public fun getWeights(hosts: List, server: Server): Result { + val weights = DoubleArray(hosts.size) + var min = Double.MAX_VALUE + var max = Double.MIN_VALUE + + for ((i, host) in hosts.withIndex()) { + val weight = getWeight(host, server) + weights[i] = weight + min = kotlin.math.min(min, weight) + max = kotlin.math.max(max, weight) + } + + return Result(weights, min, max, multiplier) + } + + /** + * A result returned by the weigher. + * + * @param weights The weights returned by the weigher. + * @param min The minimum weight returned. + * @param max The maximum weight returned. + * @param multiplier The weight multiplier to use. + */ + public class Result( + public val weights: DoubleArray, + public val min: Double, + public val max: Double, + public val multiplier: Double, + ) } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt index 2ef733e5..732cbe03 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt @@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView /** * A [HostWeigher] that weighs the hosts based on the number of instances on the host. */ -public class InstanceCountWeigher : HostWeigher { +public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.instanceCount.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt deleted file mode 100644 index 115d8e4d..00000000 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.scheduler.weights - -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView - -/** - * A [HostWeigher] that weighs the hosts based on the available memory on the host. - */ -public class MemoryWeigher : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { - return host.availableMemory.toDouble() - } - - override fun toString(): String = "MemoryWeigher" -} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt deleted file mode 100644 index df5bcd6e..00000000 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.scheduler.weights - -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView - -/** - * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host. - */ -public class ProvisionedCoresWeigher : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { - return host.provisionedCores.toDouble() - } - - override fun toString(): String = "ProvisionedCoresWeigher" -} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt new file mode 100644 index 00000000..d18d31f4 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.weights + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available memory, and a negative number will result in the scheduler preferring hosts with less memory. + */ +public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { + override fun getWeight(host: HostView, server: Server): Double { + return host.availableMemory.toDouble() + } + + override fun toString(): String = "RamWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt deleted file mode 100644 index 1615df3a..00000000 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.scheduler.weights - -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView -import java.util.* - -/** - * A [HostWeigher] that assigns random weights to each host every selection. - */ -public class RandomWeigher(private val random: Random) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double = random.nextDouble() - - override fun toString(): String = "RandomWeigher" -} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt new file mode 100644 index 00000000..4a22269b --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.weights + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. + */ +public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher { + + init { + require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" } + } + + override fun getWeight(host: HostView, server: Server): Double { + return host.host.model.cpuCount * allocationRatio - host.provisionedCores + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 7817c473..c6c01ea2 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.MemoryWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import java.util.* @@ -57,8 +58,8 @@ internal class ComputeServiceTest { scope = SimulationCoroutineScope() val clock = scope.clock val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), + weighers = listOf(RamWeigher()) ) val meter = MeterProvider.noop().get("opendc-compute") service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt new file mode 100644 index 00000000..cafd4498 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.HostModel +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.InstanceCountFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Test suite for the [FilterScheduler]. + */ +internal class FilterSchedulerTest { + @Test + fun testInvalidSubsetSize() { + assertThrows { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = 0 + ) + } + + assertThrows { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = -2 + ) + } + } + + @Test + fun testNoHosts() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testNoFiltersAndSchedulers() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostA, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testNoFiltersAndSchedulersRandom() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(1) + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostB, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testHostIsDown() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk() + every { host.host.state } returns HostState.DOWN + + scheduler.addHost(host) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testHostIsUp() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk() + every { host.host.state } returns HostState.UP + + scheduler.addHost(host) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(host, scheduler.select(server)) + } + + @Test + fun testRamFilter() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 512 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 2048 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.5)), + weighers = emptyList(), + ) + + val host = mockk() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.availableMemory } returns 2048 + + scheduler.addHost(host) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 2300 + + assertNull(scheduler.select(server)) + } + + @Test + fun testVCpuFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 3 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(16.0)), + weighers = emptyList(), + ) + + val host = mockk() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.provisionedCores } returns 0 + + scheduler.addHost(host) + + val server = mockk() + every { server.flavor.cpuCount } returns 8 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testInstanceCountFilter() { + val scheduler = FilterScheduler( + filters = listOf(InstanceCountFilter(limit = 2)), + weighers = emptyList(), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(RamWeigher(1.5)), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostA, scheduler.select(server)) + } + + @Test + fun testCoreRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(CoreRamWeigher(1.5)), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(VCpuWeigher(16.0)), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 2 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testInstanceCountWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } +} -- cgit v1.2.3 From f111081627280d4e7e1d7147c56cdce708e32433 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 14:06:39 +0200 Subject: build: Upgrade to OpenTelemetry 1.5 This change upgrades the OpenTelemetry dependency to version 1.5, which contains various breaking changes in the metrics API. --- .../opendc/compute/service/internal/ComputeServiceImpl.kt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index e7807177..d7a7e8f8 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -106,7 +106,7 @@ internal class ComputeServiceImpl( /** * The number of servers that have been submitted to the service for provisioning. */ - private val _submittedServers = meter.longCounterBuilder("servers.submitted") + private val _submittedServers = meter.counterBuilder("servers.submitted") .setDescription("Number of start requests") .setUnit("1") .build() @@ -114,7 +114,7 @@ internal class ComputeServiceImpl( /** * The number of servers that failed to be scheduled. */ - private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") + private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") .setDescription("Number of unscheduled servers") .setUnit("1") .build() @@ -122,7 +122,7 @@ internal class ComputeServiceImpl( /** * The number of servers that are waiting to be provisioned. */ - private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") + private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") .setDescription("Number of servers waiting to be provisioned") .setUnit("1") .build() @@ -130,7 +130,7 @@ internal class ComputeServiceImpl( /** * The number of servers that are waiting to be provisioned. */ - private val _runningServers = meter.longUpDownCounterBuilder("servers.active") + private val _runningServers = meter.upDownCounterBuilder("servers.active") .setDescription("Number of servers currently running") .setUnit("1") .build() @@ -138,7 +138,7 @@ internal class ComputeServiceImpl( /** * The number of servers that have finished running. */ - private val _finishedServers = meter.longCounterBuilder("servers.finished") + private val _finishedServers = meter.counterBuilder("servers.finished") .setDescription("Number of servers that finished running") .setUnit("1") .build() @@ -146,7 +146,7 @@ internal class ComputeServiceImpl( /** * The number of hosts registered at the compute service. */ - private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") + private val _hostCount = meter.upDownCounterBuilder("hosts.total") .setDescription("Number of hosts") .setUnit("1") .build() @@ -154,7 +154,7 @@ internal class ComputeServiceImpl( /** * The number of available hosts registered at the compute service. */ - private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") + private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") .setDescription("Number of available hosts") .setUnit("1") .build() -- cgit v1.2.3 From 2dd9cc6ded8c47046f4faa1b2dd6aed1085d6644 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 17:38:53 +0200 Subject: feat(compute): Track provisioning response time This change adds a metric for the provisioning time of virtual machines by the compute service. --- .../compute/service/internal/ComputeServiceImpl.kt | 17 +++++++++++++++-- .../org/opendc/compute/service/InternalServerTest.kt | 6 +++--- 2 files changed, 18 insertions(+), 5 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index d7a7e8f8..8c043142 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,7 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -159,6 +161,15 @@ internal class ComputeServiceImpl( .setUnit("1") .build() + /** + * The response time of the service. + */ + private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") + .build() + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ @@ -325,7 +336,7 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - val request = SchedulingRequest(server) + val request = SchedulingRequest(server, clock.millis()) queue.add(request) _submittedServers.add(1) _waitingServers.add(1) @@ -368,6 +379,7 @@ internal class ComputeServiceImpl( * Run a single scheduling iteration. */ private fun doSchedule() { + val now = clock.millis() while (queue.isNotEmpty()) { val request = queue.peek() @@ -402,6 +414,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) + _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) logger.info { "Assigned server $server to host $host." } @@ -430,7 +443,7 @@ internal class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - internal data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) { /** * A flag to indicate that the request is cancelled. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 20ea8d20..28fd8217 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -102,7 +102,7 @@ class InternalServerTest { val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } server.start() @@ -160,7 +160,7 @@ class InternalServerTest { val flavor = mockk() val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -223,7 +223,7 @@ class InternalServerTest { val flavor = mockk() val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request -- cgit v1.2.3 From 1e8c8ebd2b537d3795022c3222d3e37b6e61e624 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:32:07 +0200 Subject: fix(compute): Mark unschedulable server as terminated This change updates the compute service to mark servers that cannot be scheduled as terminated instead of error. Error is instead reserved for cases where the server is in an error state while running. --- .../org/opendc/compute/service/internal/ComputeServiceImpl.kt | 2 +- .../test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 8c043142..f1c055d4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -402,7 +402,7 @@ internal class ComputeServiceImpl( logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") - server.state = ServerState.ERROR + server.state = ServerState.TERMINATED continue } else { break diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index c6c01ea2..d036ec00 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -170,7 +170,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -183,7 +183,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -196,7 +196,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test -- cgit v1.2.3 From 36be169ae8a159a824a2b20cc0eb0e04b569fa09 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 16:05:23 +0200 Subject: docs(compute): Clarify terminology in compute service --- .../src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index 5632a55e..fc092a3f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,7 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * - * @property vcpuCount The number of logical processing cores available for this host. + * @property cpuCount The number of logical processing cores available for this host. * @property memorySize The amount of memory available for this host in MB. */ public data class HostModel(public val cpuCount: Int, public val memorySize: Long) -- cgit v1.2.3 From 3ca64e0110adab65526a0ccfd5b252e9f047ab10 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 14:41:05 +0200 Subject: refactor(telemetry): Create separate MeterProvider per service/host This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario. --- .../org/opendc/compute/service/ComputeService.kt | 10 ++- .../compute/service/internal/ComputeServiceImpl.kt | 26 ++++--- .../compute/service/internal/InternalServer.kt | 18 +++++ .../opendc/compute/service/ComputeServiceTest.kt | 3 +- .../opendc/compute/service/InternalServerTest.kt | 81 +++++++++++++--------- 5 files changed, 93 insertions(+), 45 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f1c055d4..824becf4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,9 +22,8 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -35,6 +34,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -42,15 +42,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -62,6 +65,11 @@ internal class ComputeServiceImpl( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + /** * The [Random] instance used to generate unique identifiers for the objects. */ @@ -365,10 +373,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -414,7 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) + _schedulerDuration.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -49,6 +52,21 @@ internal class InternalServer( */ private val watchers = mutableListOf() + /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + /** * The [Host] that has been assigned to host the server. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index d036ec00..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -61,8 +61,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 28fd8217..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,8 +99,8 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,8 +158,8 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk(relaxUnitFun = true) @@ -220,8 +221,8 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } -- cgit v1.2.3 From 8d899e29dbd757f6df320212d6e0d77ce8216ab9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 15:38:38 +0200 Subject: refactor(telemetry): Standardize compute scheduler metrics This change updates the OpenDC compute service implementation with multiple meters that follow the OpenTelemetry conventions. --- .../compute/service/internal/ComputeServiceImpl.kt | 137 +++++++++------------ 1 file changed, 58 insertions(+), 79 deletions(-) (limited to 'opendc-compute/opendc-compute-service/src') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 824becf4..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,6 +22,8 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* @@ -114,69 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. + * The number of scheduling attempts. */ - private val _submittedServers = meter.counterBuilder("servers.submitted") - .setDescription("Number of start requests") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _runningServers = meter.upDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") - .setUnit("1") - .build() - - /** - * The number of servers that have finished running. - */ - private val _finishedServers = meter.counterBuilder("servers.finished") - .setDescription("Number of servers that finished running") - .setUnit("1") - .build() - - /** - * The number of hosts registered at the compute service. + * The response time of the service. */ - private val _hostCount = meter.upDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of available hosts registered at the compute service. + * The number of servers that are pending. */ - private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() - - /** - * The response time of the service. - */ - private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -189,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -316,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -346,8 +327,7 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -395,7 +375,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -407,10 +387,10 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } server.state = ServerState.TERMINATED continue @@ -423,8 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, server.attributes) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -439,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -463,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -498,16 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.ERROR) { - _runningServers.add(-1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { -- cgit v1.2.3