summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-service/src/main')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt154
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt50
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt)23
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt)8
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt)9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt36
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt)15
13 files changed, 286 insertions, 145 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index 5632a55e..fc092a3f 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
- * @property vcpuCount The number of logical processing cores available for this host.
+ * @property cpuCount The number of logical processing cores available for this host.
* @property memorySize The amount of memory available for this host in MB.
*/
public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 8af5f86e..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,7 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -33,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -40,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -61,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -104,60 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.longCounterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
+ * The number of scheduling attempts.
*/
- private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of servers that have finished running.
+ * The response time of the service.
*/
- private val _finishedServers = meter.longCounterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
- .build()
-
- /**
- * The number of available hosts registered at the compute service.
- */
- private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -170,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -297,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -325,10 +325,9 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- val request = SchedulingRequest(server)
+ val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -354,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -368,12 +369,13 @@ internal class ComputeServiceImpl(
* Run a single scheduling iteration.
*/
private fun doSchedule() {
+ val now = clock.millis()
while (queue.isNotEmpty()) {
val request = queue.peek()
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -385,12 +387,12 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
- server.state = ServerState.ERROR
+ server.state = ServerState.TERMINATED
continue
} else {
break
@@ -401,7 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -416,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -430,7 +438,7 @@ internal class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- internal data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) {
/**
* A flag to indicate that the request is cancelled.
*/
@@ -440,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -475,14 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 0fd5b2a4..8c2d4715 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,6 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
+import java.util.*
+import kotlin.math.min
/**
* A [ComputeScheduler] implementation that uses filtering and weighing passes to select
@@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher
*
* This implementation is based on the filter scheduler from OpenStack Nova.
* See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ *
+ * @param filters The list of filters to apply when searching for an appropriate host.
+ * @param weighers The list of weighers to apply when searching for an appropriate host.
+ * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
+ * @param random A [Random] instance for selecting
*/
-public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler {
+public class FilterScheduler(
+ private val filters: List<HostFilter>,
+ private val weighers: List<HostWeigher>,
+ private val subsetSize: Int = 1,
+ private val random: Random = Random(0)
+) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
*/
private val hosts = mutableListOf<HostView>()
+ init {
+ require(subsetSize >= 1) { "Subset size must be one or greater" }
+ }
+
override fun addHost(host: HostView) {
hosts.add(host)
}
@@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val
}
override fun select(server: Server): HostView? {
- return hosts.asSequence()
- .filter { host ->
- for (filter in filters) {
- if (!filter.test(host, server))
- return@filter false
+ val hosts = hosts
+ val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } }
+
+ val subset = if (weighers.isNotEmpty()) {
+ val results = weighers.map { it.getWeights(filteredHosts, server) }
+ val weights = DoubleArray(filteredHosts.size)
+
+ for (result in results) {
+ val min = result.min
+ val range = (result.max - min)
+
+ // Skip result if all weights are the same
+ if (range == 0.0) {
+ continue
}
- true
- }
- .sortedByDescending { host ->
- weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor }
+ val multiplier = result.multiplier
+ val factor = multiplier / range
+
+ for ((i, weight) in result.weights.withIndex()) {
+ weights[i] += factor * (weight - min)
+ }
}
- .firstOrNull()
+
+ weights.indices
+ .asSequence()
+ .sortedByDescending { weights[it] }
+ .map { filteredHosts[it] }
+ .take(subsetSize)
+ .toList()
+ } else {
+ filteredHosts
+ }
+
+ return when (val maxSize = min(subsetSize, subset.size)) {
+ 0 -> null
+ 1 -> subset[0]
+ else -> subset[random.nextInt(maxSize)]
+ }
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
new file mode 100644
index 00000000..a470a453
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host.
+ *
+ * @param allocationRatio Virtual RAM to physical RAM allocation ratio.
+ */
+public class RamFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requested = server.flavor.memorySize
+ val available = host.availableMemory
+ val total = host.host.model.memorySize
+
+ // Do not allow an instance to overcommit against itself, only against
+ // other instances.
+ if (requested > total) {
+ return false
+ }
+
+ val limit = total * allocationRatio
+ val used = total - available
+ val usable = limit - used
+ return usable >= requested
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
index 072440c5..abdd79f1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
@@ -26,15 +26,22 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server
- * flavor.
+ * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ComputeCapabilitiesFilter : HostFilter {
+public class VCpuFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
- val fitsMemory = host.availableMemory >= server.flavor.memorySize
- val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount
- return fitsMemory && fitsCpu
- }
+ val requested = server.flavor.cpuCount
+ val total = host.host.model.cpuCount
+ val limit = total * allocationRatio
- override fun toString(): String = "ComputeCapabilitiesFilter"
+ // Do not allow an instance to overcommit against itself, only against other instances
+ if (requested > total) {
+ return false
+ }
+
+ val free = limit - host.provisionedCores
+ return free >= requested
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
index 12e6510e..d668fdaf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
@@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available core memory, and a negative number will result in the scheduler preferring hosts with less available core
+ * memory.
*/
-public class CoreMemoryWeigher : HostWeigher {
+public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble() / host.host.model.cpuCount
}
- override fun toString(): String = "CoreMemoryWeigher"
+ override fun toString(): String = "CoreRamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
index d48ee9e0..aca8c4e6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
@@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler
/**
* An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
*/
-public fun interface HostWeigher {
+public interface HostWeigher {
+ /**
+ * The multiplier for the weigher.
+ */
+ public val multiplier: Double
+
/**
* Obtain the weight of the specified [host] when scheduling the specified [server].
*/
public fun getWeight(host: HostView, server: Server): Double
+
+ /**
+ * Obtain the weights for [hosts] when scheduling the specified [server].
+ */
+ public fun getWeights(hosts: List<HostView>, server: Server): Result {
+ val weights = DoubleArray(hosts.size)
+ var min = Double.MAX_VALUE
+ var max = Double.MIN_VALUE
+
+ for ((i, host) in hosts.withIndex()) {
+ val weight = getWeight(host, server)
+ weights[i] = weight
+ min = kotlin.math.min(min, weight)
+ max = kotlin.math.max(max, weight)
+ }
+
+ return Result(weights, min, max, multiplier)
+ }
+
+ /**
+ * A result returned by the weigher.
+ *
+ * @param weights The weights returned by the weigher.
+ * @param min The minimum weight returned.
+ * @param max The maximum weight returned.
+ * @param multiplier The weight multiplier to use.
+ */
+ public class Result(
+ public val weights: DoubleArray,
+ public val min: Double,
+ public val max: Double,
+ public val multiplier: Double,
+ )
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
index 2ef733e5..732cbe03 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
@@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the number of instances on the host.
*/
-public class InstanceCountWeigher : HostWeigher {
+public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.instanceCount.toDouble()
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
index 115d8e4d..d18d31f4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
@@ -26,12 +26,15 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the available memory on the host.
+ * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available memory, and a negative number will result in the scheduler preferring hosts with less memory.
*/
-public class MemoryWeigher : HostWeigher {
+public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble()
}
- override fun toString(): String = "MemoryWeigher"
+ override fun toString(): String = "RamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
deleted file mode 100644
index 1615df3a..00000000
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler.weights
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
-import java.util.*
-
-/**
- * A [HostWeigher] that assigns random weights to each host every selection.
- */
-public class RandomWeigher(private val random: Random) : HostWeigher {
- override fun getWeight(host: HostView, server: Server): Double = random.nextDouble()
-
- override fun toString(): String = "RandomWeigher"
-}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
index df5bcd6e..4a22269b 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
@@ -26,12 +26,19 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host.
+ * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ProvisionedCoresWeigher : HostWeigher {
+public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher {
+
+ init {
+ require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" }
+ }
+
override fun getWeight(host: HostView, server: Server): Double {
- return host.provisionedCores.toDouble()
+ return host.host.model.cpuCount * allocationRatio - host.provisionedCores
}
- override fun toString(): String = "ProvisionedCoresWeigher"
+ override fun toString(): String = "VCpuWeigher"
}