summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-08 20:46:39 +0200
committerGitHub <noreply@github.com>2021-04-08 20:46:39 +0200
commit3820ac4d31d6eb04034b85a1b53667d64ce6ba89 (patch)
treea7506b631770d6032eddc5a8252931b03c6e1796
parent5fdbfbe7d340bc10f8b9eebd5aa23bdfd7dc4e18 (diff)
parent4f80e79b567b7d91b1086dcd74ef35616d7177f2 (diff)
compute: Implement filter scheduler
This pull request implements the filter scheduler modeled after the scheduler from [OpenStack](https://docs.openstack.org/nova/latest/user/filter-scheduler.html). The scheduler is functionally equivalent to the old allocation policies, but is more flexible and allows policies to be combined. * A new interface, `ComputeScheduler` is introduced, which is used by the `ComputeServiceImpl` to pick hosts to schedule on. * `FilterScheduler` is implemented, which works by filtering and weighing the available hosts. **Breaking API Changes** * Removal of the `AllocationPolicy` interface and its implementations. Users should migrate to the filter scheduler which offers the same functionality and more.
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt25
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt9
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt)35
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt66
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt40
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt48
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt)42
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt)21
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt)20
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt38
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt)18
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt37
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt (renamed from simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt)26
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt37
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt37
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt37
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt36
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt14
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt219
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt16
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt63
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt21
-rw-r--r--simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt26
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt54
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt11
26 files changed, 539 insertions, 465 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 98566da3..1873eb99 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -26,7 +26,7 @@ import io.opentelemetry.api.metrics.Meter
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
-import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -70,16 +70,16 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param allocationPolicy The allocation policy to use.
+ * @param scheduler The scheduler implementation to use.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
meter: Meter,
- allocationPolicy: AllocationPolicy,
+ scheduler: ComputeScheduler,
schedulingQuantum: Long = 300000,
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
}
}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 26a34ad9..8af5f86e 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -30,7 +30,7 @@ import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
import java.util.*
@@ -47,7 +47,7 @@ internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
private val meter: Meter,
- private val allocationPolicy: AllocationPolicy,
+ private val scheduler: ComputeScheduler,
private val schedulingQuantum: Long
) : ComputeService, HostListener {
/**
@@ -160,14 +160,9 @@ internal class ComputeServiceImpl(
.build()
/**
- * The allocation logic to use.
- */
- private val allocationLogic = allocationPolicy()
-
- /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
- private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+ private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
override val hosts: Set<Host>
get() = hostToView.keys
@@ -306,6 +301,7 @@ internal class ComputeServiceImpl(
availableHosts += hv
}
+ scheduler.addHost(hv)
_hostCount.add(1)
host.addListener(this)
}
@@ -316,6 +312,7 @@ internal class ComputeServiceImpl(
if (availableHosts.remove(view)) {
_availableHostCount.add(-1)
}
+ scheduler.removeHost(view)
host.removeListener(this)
_hostCount.add(-1)
}
@@ -353,7 +350,7 @@ internal class ComputeServiceImpl(
*/
private fun requestSchedulingCycle() {
// Bail out in case we have already requested a new cycle or the queue is empty.
- if (scheduler.isTimerActive(Unit) || queue.isEmpty()) {
+ if (timerScheduler.isTimerActive(Unit) || queue.isEmpty()) {
return
}
@@ -362,7 +359,7 @@ internal class ComputeServiceImpl(
// We calculate here the delay until the next scheduling slot.
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
- scheduler.startSingleTimer(Unit, delay) {
+ timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
}
}
@@ -381,7 +378,7 @@ internal class ComputeServiceImpl(
}
val server = request.server
- val hv = allocationLogic.select(availableHosts, request.server)
+ val hv = scheduler.select(request.server)
if (hv == null || !hv.host.canFit(server)) {
logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" }
@@ -410,7 +407,7 @@ internal class ComputeServiceImpl(
// Speculatively update the hypervisor view information to prevent other images in the queue from
// deciding on stale values.
- hv.numberOfActiveServers++
+ hv.instanceCount++
hv.provisionedCores += server.flavor.cpuCount
hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
@@ -422,7 +419,7 @@ internal class ComputeServiceImpl(
} catch (e: Throwable) {
logger.error("Failed to deploy VM", e)
- hv.numberOfActiveServers--
+ hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
}
@@ -490,7 +487,7 @@ internal class ComputeServiceImpl(
val hv = hostToView[host]
if (hv != null) {
hv.provisionedCores -= server.flavor.cpuCount
- hv.numberOfActiveServers--
+ hv.instanceCount--
hv.availableMemory += server.flavor.memorySize
} else {
logger.error { "Unknown host $host" }
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
index 5793541f..e2f33f11 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
@@ -22,14 +22,21 @@
package org.opendc.compute.service.internal
+import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import java.util.UUID
+/**
+ * A view of a [Host] as seen from the [ComputeService]
+ */
public class HostView(public val host: Host) {
+ /**
+ * The unique identifier of the host.
+ */
public val uid: UUID
get() = host.uid
- public var numberOfActiveServers: Int = 0
+ public var instanceCount: Int = 0
public var availableMemory: Long = host.model.memorySize
public var provisionedCores: Int = 0
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt
index 265d514d..a2ab3a2e 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeScheduler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -23,27 +23,28 @@
package org.opendc.compute.service.scheduler
import org.opendc.compute.api.Server
+import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.internal.HostView
/**
- * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node.
+ * A generic scheduler interface used by the [ComputeService] to select hosts to place [Server]s on.
*/
-public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
+public interface ComputeScheduler {
/**
- * The comparator to use.
+ * Register the specified [host] to be used for scheduling.
*/
- public val comparator: Comparator<HostView>
+ public fun addHost(host: HostView)
- override fun select(
- hypervisors: Set<HostView>,
- server: Server
- ): HostView? {
- return hypervisors.asSequence()
- .filter { hv ->
- val fitsMemory = hv.availableMemory >= (server.flavor.memorySize)
- val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount
- fitsMemory && fitsCpu
- }
- .minWithOrNull(comparator.thenBy { it.host.uid })
- }
+ /**
+ * Remove the specified [host] to be removed from the scheduling pool.
+ */
+ public fun removeHost(host: HostView)
+
+ /**
+ * Select a host for the specified [server].
+ *
+ * @param server The server to select a host for.
+ * @return The host to schedule the server on or `null` if no server is available.
+ */
+ public fun select(server: Server): HostView?
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
new file mode 100644
index 00000000..0fd5b2a4
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.filters.HostFilter
+import org.opendc.compute.service.scheduler.weights.HostWeigher
+
+/**
+ * A [ComputeScheduler] implementation that uses filtering and weighing passes to select
+ * the host to schedule a [Server] on.
+ *
+ * This implementation is based on the filter scheduler from OpenStack Nova.
+ * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ */
+public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler {
+ /**
+ * The pool of hosts available to the scheduler.
+ */
+ private val hosts = mutableListOf<HostView>()
+
+ override fun addHost(host: HostView) {
+ hosts.add(host)
+ }
+
+ override fun removeHost(host: HostView) {
+ hosts.remove(host)
+ }
+
+ override fun select(server: Server): HostView? {
+ return hosts.asSequence()
+ .filter { host ->
+ for (filter in filters) {
+ if (!filter.test(host, server))
+ return@filter false
+ }
+
+ true
+ }
+ .sortedByDescending { host ->
+ weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor }
+ }
+ .firstOrNull()
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt
deleted file mode 100644
index 4c196953..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler
-
-import org.opendc.compute.service.internal.HostView
-
-/**
- * An [AllocationPolicy] that takes into account the number of vCPUs that have been provisioned on this machine
- * relative to its core count.
- *
- * @param reversed A flag to reverse the order of the policy, such that the machine with the most provisioned cores
- * is selected.
- */
-public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
- override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HostView> =
- compareBy<HostView> { it.provisionedCores / it.host.model.cpuCount }
- .run { if (reversed) reversed() else this }
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
deleted file mode 100644
index 006e0d1c..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
-import kotlin.random.Random
-
-/**
- * An [AllocationPolicy] that select a random node on which the server fits.
- */
-public class RandomAllocationPolicy(private val random: Random = Random(0)) : AllocationPolicy {
- @OptIn(ExperimentalStdlibApi::class)
- override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
- override fun select(
- hypervisors: Set<HostView>,
- server: Server
- ): HostView? {
- return hypervisors.asIterable()
- .filter { hv ->
- val fitsMemory = hv.availableMemory >= server.flavor.memorySize
- val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount
- fitsMemory && fitsCpu
- }
- .randomOrNull(random)
- }
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt
index 2c953f8b..284c1f91 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt
@@ -32,25 +32,33 @@ import org.opendc.compute.service.internal.HostView
* Within each cluster, the active servers on each node determine which node gets
* assigned the VM image.
*/
-public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy {
+public class ReplayScheduler(private val vmPlacements: Map<String, String>) : ComputeScheduler {
private val logger = KotlinLogging.logger {}
- override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
- override fun select(
- hypervisors: Set<HostView>,
- server: Server
- ): HostView? {
- val clusterName = vmPlacements[server.name]
- ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}")
- val machinesInCluster = hypervisors.filter { it.host.name.contains(clusterName) }
-
- if (machinesInCluster.isEmpty()) {
- logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." }
- return hypervisors.maxByOrNull { it.availableMemory }
- }
-
- return machinesInCluster.maxByOrNull { it.availableMemory }
- ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign")
+ /**
+ * The pool of hosts available to the scheduler.
+ */
+ private val hosts = mutableListOf<HostView>()
+
+ override fun addHost(host: HostView) {
+ hosts.add(host)
+ }
+
+ override fun removeHost(host: HostView) {
+ hosts.remove(host)
+ }
+
+ override fun select(server: Server): HostView? {
+ val clusterName = vmPlacements[server.name]
+ ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}")
+ val machinesInCluster = hosts.filter { it.host.name.contains(clusterName) }
+
+ if (machinesInCluster.isEmpty()) {
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." }
+ return hosts.maxByOrNull { it.availableMemory }
}
+
+ return machinesInCluster.maxByOrNull { it.availableMemory }
+ ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign")
}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
index 29eba782..072440c5 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,18 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.compute.service.scheduler
+package org.opendc.compute.service.scheduler.filters
+import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * Allocation policy that selects the node with the least amount of active servers.
- *
- * @param reversed A flag to reverse the order, such that the node with the most active servers is selected.
+ * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server
+ * flavor.
*/
-public class NumberOfActiveServersAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy {
- override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HostView> = compareBy<HostView> { it.numberOfActiveServers }
- .run { if (reversed) reversed() else this }
+public class ComputeCapabilitiesFilter : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val fitsMemory = host.availableMemory >= server.flavor.memorySize
+ val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount
+ return fitsMemory && fitsCpu
}
+
+ override fun toString(): String = "ComputeCapabilitiesFilter"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt
index ad422415..fb842415 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.compute.service.scheduler
+package org.opendc.compute.service.scheduler.filters
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.internal.HostView
/**
- * An [AllocationPolicy] that selects the machine with the highest/lowest amount of memory per core.
- *
- * @param reversed An option to reverse the order of the machines (lower amount of memory scores better).
+ * A [HostFilter] that filters on active hosts.
*/
-public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
- override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HostView> =
- compareBy<HostView> { -it.availableMemory / it.host.model.cpuCount }
- .run { if (reversed) reversed() else this }
+public class ComputeFilter : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ return host.host.state == HostState.UP
}
+
+ override fun toString(): String = "ComputeFilter"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt
new file mode 100644
index 00000000..9e909ca6
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.FilterScheduler
+
+/**
+ * A filter used by the [FilterScheduler] to filter hosts.
+ */
+public fun interface HostFilter {
+ /**
+ * Test whether the specified [host] should be included in the selection
+ * for scheduling the specified [server].
+ */
+ public fun test(host: HostView, server: Server): Boolean
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt
index 6712b8a2..ed6674b1 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,18 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.compute.service.scheduler
+package org.opendc.compute.service.scheduler.filters
+import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * Allocation policy that selects the node with the most available memory.
+ * A [HostFilter] that filters hosts based on the number of instances on the host.
*
- * @param reversed A flag to reverse the order (least amount of memory scores the best).
+ * @param limit The maximum number of instances on the host.
*/
-public class AvailableMemoryAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy {
- override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HostView> = compareBy<HostView> { -it.availableMemory }
- .run { if (reversed) reversed() else this }
+public class InstanceCountFilter(private val limit: Int) : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ return host.instanceCount < limit
}
+
+ override fun toString(): String = "InstanceCountFilter[limit=$limit]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
new file mode 100644
index 00000000..12e6510e
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ */
+public class CoreMemoryWeigher : HostWeigher {
+ override fun getWeight(host: HostView, server: Server): Double {
+ return host.availableMemory.toDouble() / host.host.model.cpuCount
+ }
+
+ override fun toString(): String = "CoreMemoryWeigher"
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
index 5ee4c70f..d48ee9e0 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,30 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.compute.service.scheduler
+package org.opendc.compute.service.scheduler.weights
import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.FilterScheduler
/**
- * A policy for selecting the [Node] an image should be deployed to,
+ * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
*/
-public interface AllocationPolicy {
+public fun interface HostWeigher {
/**
- * The logic of the allocation policy.
+ * Obtain the weight of the specified [host] when scheduling the specified [server].
*/
- public interface Logic {
- /**
- * Select the node on which the server should be scheduled.
- */
- public fun select(
- hypervisors: Set<HostView>,
- server: Server
- ): HostView?
- }
-
- /**
- * Builds the logic of the policy.
- */
- public operator fun invoke(): Logic
+ public fun getWeight(host: HostView, server: Server): Double
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
new file mode 100644
index 00000000..2ef733e5
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the number of instances on the host.
+ */
+public class InstanceCountWeigher : HostWeigher {
+ override fun getWeight(host: HostView, server: Server): Double {
+ return host.instanceCount.toDouble()
+ }
+
+ override fun toString(): String = "InstanceCountWeigher"
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
new file mode 100644
index 00000000..115d8e4d
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the available memory on the host.
+ */
+public class MemoryWeigher : HostWeigher {
+ override fun getWeight(host: HostView, server: Server): Double {
+ return host.availableMemory.toDouble()
+ }
+
+ override fun toString(): String = "MemoryWeigher"
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
new file mode 100644
index 00000000..df5bcd6e
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host.
+ */
+public class ProvisionedCoresWeigher : HostWeigher {
+ override fun getWeight(host: HostView, server: Server): Double {
+ return host.provisionedCores.toDouble()
+ }
+
+ override fun toString(): String = "ProvisionedCoresWeigher"
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
new file mode 100644
index 00000000..1615df3a
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+import java.util.*
+
+/**
+ * A [HostWeigher] that assigns random weights to each host every selection.
+ */
+public class RandomWeigher(private val random: Random) : HostWeigher {
+ override fun getWeight(host: HostView, server: Server): Double = random.nextDouble()
+
+ override fun toString(): String = "RandomWeigher"
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 45a306aa..c6e24346 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -39,7 +39,10 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.MemoryWeigher
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.*
@@ -55,9 +58,12 @@ internal class ComputeServiceTest {
fun setUp() {
scope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(scope)
- val policy = AvailableMemoryAllocationPolicy()
+ val computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, policy)
+ service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
}
@AfterEach
@@ -257,6 +263,7 @@ internal class ComputeServiceTest {
server.start()
delay(5 * 60 * 1000)
+ every { host.state } returns HostState.UP
listeners.forEach { it.onStateChanged(host, HostState.UP) }
delay(5 * 60 * 1000)
@@ -286,6 +293,7 @@ internal class ComputeServiceTest {
delay(5 * 60 * 1000)
+ every { host.state } returns HostState.DOWN
listeners.forEach { it.onStateChanged(host, HostState.DOWN) }
server.start()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt
deleted file mode 100644
index db377914..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler
-
-import io.mockk.every
-import io.mockk.mockk
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.Arguments
-import org.junit.jupiter.params.provider.MethodSource
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
-import java.util.*
-import java.util.stream.Stream
-import kotlin.random.Random
-
-/**
- * Test suite for the [AllocationPolicy] interface.
- */
-internal class AllocationPolicyTest {
- @ParameterizedTest
- @MethodSource("activeServersArgs")
- fun testActiveServersPolicy(
- reversed: Boolean,
- hosts: Set<HostView>,
- server: Server,
- expectedHost: HostView?
- ) {
- val policy = NumberOfActiveServersAllocationPolicy(reversed)
- assertEquals(expectedHost, policy.invoke().select(hosts, server))
- }
-
- @ParameterizedTest
- @MethodSource("availableMemoryArgs")
- fun testAvailableMemoryPolicy(
- reversed: Boolean,
- hosts: Set<HostView>,
- server: Server,
- expectedHost: HostView?
- ) {
- val policy = AvailableMemoryAllocationPolicy(reversed)
- assertEquals(expectedHost, policy.invoke().select(hosts, server))
- }
-
- @ParameterizedTest
- @MethodSource("availableCoreMemoryArgs")
- fun testAvailableCoreMemoryPolicy(
- reversed: Boolean,
- hosts: Set<HostView>,
- server: Server,
- expectedHost: HostView?
- ) {
- val policy = AvailableMemoryAllocationPolicy(reversed)
- assertEquals(expectedHost, policy.invoke().select(hosts, server))
- }
-
- @ParameterizedTest
- @MethodSource("provisionedCoresArgs")
- fun testProvisionedPolicy(
- reversed: Boolean,
- hosts: Set<HostView>,
- server: Server,
- expectedHost: HostView?
- ) {
- val policy = ProvisionedCoresAllocationPolicy(reversed)
- assertEquals(expectedHost, policy.invoke().select(hosts, server))
- }
-
- @Suppress("unused")
- private companion object {
- /**
- * Test arguments for the [NumberOfActiveServersAllocationPolicy].
- */
- @JvmStatic
- fun activeServersArgs(): Stream<Arguments> {
- val random = Random(1)
- val hosts = List(4) { i ->
- val view = mockk<HostView>()
- every { view.host.uid } returns UUID(0, i.toLong())
- every { view.host.model.cpuCount } returns random.nextInt(1, 16)
- every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
- every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
- every { view.numberOfActiveServers } returns random.nextInt(0, 6)
- every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
- every { view.toString() } returns "HostView[$i,numberOfActiveServers=${view.numberOfActiveServers}]"
- view
- }
-
- val servers = List(2) {
- val server = mockk<Server>()
- every { server.flavor.cpuCount } returns random.nextInt(1, 8)
- every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
- server
- }
-
- return Stream.of(
- Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
- Arguments.of(false, hosts.toSet(), servers[1], hosts[1]),
- Arguments.of(true, hosts.toSet(), servers[1], hosts[0]),
- )
- }
-
- /**
- * Test arguments for the [AvailableCoreMemoryAllocationPolicy].
- */
- @JvmStatic
- fun availableCoreMemoryArgs(): Stream<Arguments> {
- val random = Random(1)
- val hosts = List(4) { i ->
- val view = mockk<HostView>()
- every { view.host.uid } returns UUID(0, i.toLong())
- every { view.host.model.cpuCount } returns random.nextInt(1, 16)
- every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
- every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
- every { view.numberOfActiveServers } returns random.nextInt(0, 6)
- every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
- every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]"
- view
- }
-
- val servers = List(2) {
- val server = mockk<Server>()
- every { server.flavor.cpuCount } returns random.nextInt(1, 8)
- every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
- server
- }
-
- return Stream.of(
- Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
- Arguments.of(false, hosts.toSet(), servers[1], hosts[2]),
- Arguments.of(true, hosts.toSet(), servers[1], hosts[1]),
- )
- }
-
- /**
- * Test arguments for the [AvailableMemoryAllocationPolicy].
- */
- @JvmStatic
- fun availableMemoryArgs(): Stream<Arguments> {
- val random = Random(1)
- val hosts = List(4) { i ->
- val view = mockk<HostView>()
- every { view.host.uid } returns UUID(0, i.toLong())
- every { view.host.model.cpuCount } returns random.nextInt(1, 16)
- every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
- every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
- every { view.numberOfActiveServers } returns random.nextInt(0, 6)
- every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
- every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]"
- view
- }
-
- val servers = List(2) {
- val server = mockk<Server>()
- every { server.flavor.cpuCount } returns random.nextInt(1, 8)
- every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
- server
- }
-
- return Stream.of(
- Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
- Arguments.of(false, hosts.toSet(), servers[1], hosts[2]),
- Arguments.of(true, hosts.toSet(), servers[1], hosts[1]),
- )
- }
-
- /**
- * Test arguments for the [ProvisionedCoresAllocationPolicy].
- */
- @JvmStatic
- fun provisionedCoresArgs(): Stream<Arguments> {
- val random = Random(1)
- val hosts = List(4) { i ->
- val view = mockk<HostView>()
- every { view.host.uid } returns UUID(0, i.toLong())
- every { view.host.model.cpuCount } returns random.nextInt(1, 16)
- every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
- every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
- every { view.numberOfActiveServers } returns random.nextInt(0, 6)
- every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
- every { view.toString() } returns "HostView[$i,provisionedCores=${view.provisionedCores}]"
- view
- }
-
- val servers = List(2) {
- val server = mockk<Server>()
- every { server.flavor.cpuCount } returns random.nextInt(1, 8)
- every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
- server
- }
-
- return Stream.of(
- Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
- Arguments.of(false, hosts.toSet(), servers[1], hosts[0]),
- Arguments.of(true, hosts.toSet(), servers[1], hosts[0]),
- )
- }
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 40f50235..997eba0c 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -135,7 +135,7 @@ public suspend fun withComputeService(
clock: Clock,
meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy,
+ scheduler: ComputeScheduler,
block: suspend CoroutineScope.(ComputeService) -> Unit
): Unit = coroutineScope {
val hosts = environmentReader
@@ -154,18 +154,18 @@ public suspend fun withComputeService(
)
}
- val schedulerMeter = meterProvider.get("opendc-compute")
- val scheduler =
- ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
+ val serviceMeter = meterProvider.get("opendc-compute")
+ val service =
+ ComputeService(coroutineContext, clock, serviceMeter, scheduler)
for (host in hosts) {
- scheduler.addHost(host)
+ service.addHost(host)
}
try {
- block(this, scheduler)
+ block(this, service)
} finally {
- scheduler.close()
+ service.close()
hosts.forEach(SimHost::close)
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 5fa77161..941d3c97 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -31,6 +31,9 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.*
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -45,8 +48,9 @@ import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.Random
+import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -115,11 +119,11 @@ public abstract class Portfolio(name: String) : Experiment(name) {
@OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val seeder = Random(repeat)
+ val seeder = Random(repeat.toLong())
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createAllocationPolicy(seeder)
+ val allocationPolicy = createComputeScheduler(seeder)
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
@@ -142,7 +146,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
val performanceInterferenceModel = performanceInterferenceModel
?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder) ?: emptyMap()
+ ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
val monitor = ParquetExperimentMonitor(
@@ -184,20 +188,47 @@ public abstract class Portfolio(name: String) : Experiment(name) {
}
/**
- * Create the [AllocationPolicy] instance to use for the trial.
+ * Create the [ComputeScheduler] instance to use for the trial.
*/
- private fun createAllocationPolicy(seeder: Random): AllocationPolicy {
+ private fun createComputeScheduler(seeder: Random): ComputeScheduler {
return when (allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index d2e7473f..4a47922d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,7 +34,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
@@ -71,7 +74,10 @@ class CapelinIntegrationTest {
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
lateinit var monitorResults: ComputeMetrics
@@ -118,9 +124,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -130,7 +136,10 @@ class CapelinIntegrationTest {
val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
diff --git a/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index aa0f5ab4..c5982d8c 100644
--- a/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -32,8 +32,11 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.compute.service.scheduler.RandomAllocationPolicy
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.RandomWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
@@ -88,7 +91,10 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
val clock = DelayControllerClockAdapter(this)
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = RandomAllocationPolicy()
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(Random(0)) to 1.0)
+ )
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
@@ -125,7 +131,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
public suspend fun withComputeService(
clock: Clock,
meterProvider: MeterProvider,
- allocationPolicy: AllocationPolicy,
+ scheduler: ComputeScheduler,
block: suspend CoroutineScope.(ComputeService) -> Unit
): Unit = coroutineScope {
val model = createMachineModel()
@@ -144,18 +150,18 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
)
}
- val schedulerMeter = meterProvider.get("opendc-compute")
- val scheduler =
- ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
+ val serviceMeter = meterProvider.get("opendc-compute")
+ val service =
+ ComputeService(coroutineContext, clock, serviceMeter, scheduler)
for (host in hosts) {
- scheduler.addHost(host)
+ service.addHost(host)
}
try {
- block(this, scheduler)
+ block(this, service)
} finally {
- scheduler.close()
+ service.close()
hosts.forEach(SimHost::close)
}
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 5b717ff7..90918f44 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -43,11 +43,10 @@ import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
-import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
-import org.opendc.compute.service.scheduler.RandomAllocationPolicy
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
@@ -229,15 +228,42 @@ public class RunnerCli : CliktCommand(name = "runner") {
val operational = scenario.get("operational", Document::class.java)
val allocationPolicy =
when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(java.util.Random(seeder.nextLong())) to 1.0)
+ )
else -> throw IllegalArgumentException("Unknown policy $policyName")
}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
index 46c0d835..be59c8d2 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -35,7 +35,10 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
@@ -84,7 +87,11 @@ internal class WorkflowServiceIntegrationTest {
}
val meter = MeterProvider.noop().get("opendc-compute")
- val compute = ComputeService(coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+ val computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)
hosts.forEach { compute.addHost(it) }