summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-service/src')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt154
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt50
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt)23
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt)8
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt)9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt36
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt)15
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt87
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt407
16 files changed, 765 insertions, 200 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index 5632a55e..fc092a3f 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
- * @property vcpuCount The number of logical processing cores available for this host.
+ * @property cpuCount The number of logical processing cores available for this host.
* @property memorySize The amount of memory available for this host in MB.
*/
public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 8af5f86e..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,7 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -33,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -40,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -61,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -104,60 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.longCounterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
+ * The number of scheduling attempts.
*/
- private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of servers that have finished running.
+ * The response time of the service.
*/
- private val _finishedServers = meter.longCounterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
- .build()
-
- /**
- * The number of available hosts registered at the compute service.
- */
- private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -170,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -297,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -325,10 +325,9 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- val request = SchedulingRequest(server)
+ val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -354,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -368,12 +369,13 @@ internal class ComputeServiceImpl(
* Run a single scheduling iteration.
*/
private fun doSchedule() {
+ val now = clock.millis()
while (queue.isNotEmpty()) {
val request = queue.peek()
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -385,12 +387,12 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
- server.state = ServerState.ERROR
+ server.state = ServerState.TERMINATED
continue
} else {
break
@@ -401,7 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -416,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -430,7 +438,7 @@ internal class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- internal data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) {
/**
* A flag to indicate that the request is cancelled.
*/
@@ -440,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -475,14 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 0fd5b2a4..8c2d4715 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,6 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
+import java.util.*
+import kotlin.math.min
/**
* A [ComputeScheduler] implementation that uses filtering and weighing passes to select
@@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher
*
* This implementation is based on the filter scheduler from OpenStack Nova.
* See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ *
+ * @param filters The list of filters to apply when searching for an appropriate host.
+ * @param weighers The list of weighers to apply when searching for an appropriate host.
+ * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
+ * @param random A [Random] instance for selecting
*/
-public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler {
+public class FilterScheduler(
+ private val filters: List<HostFilter>,
+ private val weighers: List<HostWeigher>,
+ private val subsetSize: Int = 1,
+ private val random: Random = Random(0)
+) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
*/
private val hosts = mutableListOf<HostView>()
+ init {
+ require(subsetSize >= 1) { "Subset size must be one or greater" }
+ }
+
override fun addHost(host: HostView) {
hosts.add(host)
}
@@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val
}
override fun select(server: Server): HostView? {
- return hosts.asSequence()
- .filter { host ->
- for (filter in filters) {
- if (!filter.test(host, server))
- return@filter false
+ val hosts = hosts
+ val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } }
+
+ val subset = if (weighers.isNotEmpty()) {
+ val results = weighers.map { it.getWeights(filteredHosts, server) }
+ val weights = DoubleArray(filteredHosts.size)
+
+ for (result in results) {
+ val min = result.min
+ val range = (result.max - min)
+
+ // Skip result if all weights are the same
+ if (range == 0.0) {
+ continue
}
- true
- }
- .sortedByDescending { host ->
- weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor }
+ val multiplier = result.multiplier
+ val factor = multiplier / range
+
+ for ((i, weight) in result.weights.withIndex()) {
+ weights[i] += factor * (weight - min)
+ }
}
- .firstOrNull()
+
+ weights.indices
+ .asSequence()
+ .sortedByDescending { weights[it] }
+ .map { filteredHosts[it] }
+ .take(subsetSize)
+ .toList()
+ } else {
+ filteredHosts
+ }
+
+ return when (val maxSize = min(subsetSize, subset.size)) {
+ 0 -> null
+ 1 -> subset[0]
+ else -> subset[random.nextInt(maxSize)]
+ }
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
new file mode 100644
index 00000000..a470a453
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host.
+ *
+ * @param allocationRatio Virtual RAM to physical RAM allocation ratio.
+ */
+public class RamFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requested = server.flavor.memorySize
+ val available = host.availableMemory
+ val total = host.host.model.memorySize
+
+ // Do not allow an instance to overcommit against itself, only against
+ // other instances.
+ if (requested > total) {
+ return false
+ }
+
+ val limit = total * allocationRatio
+ val used = total - available
+ val usable = limit - used
+ return usable >= requested
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
index 072440c5..abdd79f1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
@@ -26,15 +26,22 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server
- * flavor.
+ * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ComputeCapabilitiesFilter : HostFilter {
+public class VCpuFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
- val fitsMemory = host.availableMemory >= server.flavor.memorySize
- val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount
- return fitsMemory && fitsCpu
- }
+ val requested = server.flavor.cpuCount
+ val total = host.host.model.cpuCount
+ val limit = total * allocationRatio
- override fun toString(): String = "ComputeCapabilitiesFilter"
+ // Do not allow an instance to overcommit against itself, only against other instances
+ if (requested > total) {
+ return false
+ }
+
+ val free = limit - host.provisionedCores
+ return free >= requested
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
index 12e6510e..d668fdaf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
@@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available core memory, and a negative number will result in the scheduler preferring hosts with less available core
+ * memory.
*/
-public class CoreMemoryWeigher : HostWeigher {
+public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble() / host.host.model.cpuCount
}
- override fun toString(): String = "CoreMemoryWeigher"
+ override fun toString(): String = "CoreRamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
index d48ee9e0..aca8c4e6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
@@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler
/**
* An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
*/
-public fun interface HostWeigher {
+public interface HostWeigher {
+ /**
+ * The multiplier for the weigher.
+ */
+ public val multiplier: Double
+
/**
* Obtain the weight of the specified [host] when scheduling the specified [server].
*/
public fun getWeight(host: HostView, server: Server): Double
+
+ /**
+ * Obtain the weights for [hosts] when scheduling the specified [server].
+ */
+ public fun getWeights(hosts: List<HostView>, server: Server): Result {
+ val weights = DoubleArray(hosts.size)
+ var min = Double.MAX_VALUE
+ var max = Double.MIN_VALUE
+
+ for ((i, host) in hosts.withIndex()) {
+ val weight = getWeight(host, server)
+ weights[i] = weight
+ min = kotlin.math.min(min, weight)
+ max = kotlin.math.max(max, weight)
+ }
+
+ return Result(weights, min, max, multiplier)
+ }
+
+ /**
+ * A result returned by the weigher.
+ *
+ * @param weights The weights returned by the weigher.
+ * @param min The minimum weight returned.
+ * @param max The maximum weight returned.
+ * @param multiplier The weight multiplier to use.
+ */
+ public class Result(
+ public val weights: DoubleArray,
+ public val min: Double,
+ public val max: Double,
+ public val multiplier: Double,
+ )
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
index 2ef733e5..732cbe03 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
@@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the number of instances on the host.
*/
-public class InstanceCountWeigher : HostWeigher {
+public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.instanceCount.toDouble()
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
index 115d8e4d..d18d31f4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
@@ -26,12 +26,15 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the available memory on the host.
+ * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available memory, and a negative number will result in the scheduler preferring hosts with less memory.
*/
-public class MemoryWeigher : HostWeigher {
+public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble()
}
- override fun toString(): String = "MemoryWeigher"
+ override fun toString(): String = "RamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
deleted file mode 100644
index 1615df3a..00000000
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler.weights
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
-import java.util.*
-
-/**
- * A [HostWeigher] that assigns random weights to each host every selection.
- */
-public class RandomWeigher(private val random: Random) : HostWeigher {
- override fun getWeight(host: HostView, server: Server): Double = random.nextDouble()
-
- override fun toString(): String = "RandomWeigher"
-}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
index df5bcd6e..4a22269b 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
@@ -26,12 +26,19 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host.
+ * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ProvisionedCoresWeigher : HostWeigher {
+public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher {
+
+ init {
+ require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" }
+ }
+
override fun getWeight(host: HostView, server: Server): Double {
- return host.provisionedCores.toDouble()
+ return host.host.model.cpuCount * allocationRatio - host.provisionedCores
}
- override fun toString(): String = "ProvisionedCoresWeigher"
+ override fun toString(): String = "VCpuWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index a6258845..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.MemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
import java.util.*
@@ -57,11 +58,10 @@ internal class ComputeServiceTest {
scope = SimulationCoroutineScope()
val clock = scope.clock
val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
+ weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
@@ -167,9 +167,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -180,9 +180,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -193,9 +193,9 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -207,7 +207,7 @@ internal class ComputeServiceTest {
server.start()
server.stop()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.TERMINATED, server.state)
}
@@ -228,7 +228,7 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(10 * 60 * 1000)
+ delay(10L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -254,12 +254,12 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
every { host.state } returns HostState.UP
listeners.forEach { it.onStateChanged(host, HostState.UP) }
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -284,13 +284,13 @@ internal class ComputeServiceTest {
val image = client.newImage("test")
val server = client.newServer("test", image, flavor, start = false)
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
every { host.state } returns HostState.DOWN
listeners.forEach { it.onStateChanged(host, HostState.DOWN) }
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
@@ -344,7 +344,7 @@ internal class ComputeServiceTest {
// Start server
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
coVerify { host.spawn(capture(slot), true) }
listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
@@ -383,7 +383,7 @@ internal class ComputeServiceTest {
val server = client.newServer("test", image, flavor, start = false)
server.start()
- delay(5 * 60 * 1000)
+ delay(5L * 60 * 1000)
server.refresh()
assertEquals(ServerState.PROVISIONING, server.state)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 20ea8d20..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,11 +99,11 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) }
+ every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
server.start()
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,10 +158,10 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,10 +221,10 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
new file mode 100644
index 00000000..cafd4498
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
@@ -0,0 +1,407 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.driver.HostModel
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.InstanceCountFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Test suite for the [FilterScheduler].
+ */
+internal class FilterSchedulerTest {
+ @Test
+ fun testInvalidSubsetSize() {
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = 0
+ )
+ }
+
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = -2
+ )
+ }
+ }
+
+ @Test
+ fun testNoHosts() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulers() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostA, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulersRandom() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(1)
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostB, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testHostIsDown() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.DOWN
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testHostIsUp() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(host, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 512
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 2048
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.5)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.availableMemory } returns 2048
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 2300
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 3
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(16.0)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.provisionedCores } returns 0
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 8
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(InstanceCountFilter(limit = 2)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(RamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostA, scheduler.select(server))
+ }
+
+ @Test
+ fun testCoreRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(CoreRamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(12, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(VCpuWeigher(16.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+}