summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt64
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt50
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt)23
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt)8
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt)9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt36
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt (renamed from opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt)15
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt407
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt59
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt4
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt44
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt11
19 files changed, 682 insertions, 182 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 8af5f86e..e7807177 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -477,6 +477,8 @@ internal class ComputeServiceImpl(
if (newState == ServerState.RUNNING) {
_runningServers.add(1)
+ } else if (newState == ServerState.ERROR) {
+ _runningServers.add(-1)
} else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
index 1fe90454..8c2d4715 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt
@@ -26,6 +26,8 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
+import java.util.*
+import kotlin.math.min
/**
* A [ComputeScheduler] implementation that uses filtering and weighing passes to select
@@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher
*
* This implementation is based on the filter scheduler from OpenStack Nova.
* See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html
+ *
+ * @param filters The list of filters to apply when searching for an appropriate host.
+ * @param weighers The list of weighers to apply when searching for an appropriate host.
+ * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen.
+ * @param random A [Random] instance for selecting
*/
-public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler {
+public class FilterScheduler(
+ private val filters: List<HostFilter>,
+ private val weighers: List<HostWeigher>,
+ private val subsetSize: Int = 1,
+ private val random: Random = Random(0)
+) : ComputeScheduler {
/**
* The pool of hosts available to the scheduler.
*/
private val hosts = mutableListOf<HostView>()
+ init {
+ require(subsetSize >= 1) { "Subset size must be one or greater" }
+ }
+
override fun addHost(host: HostView) {
hosts.add(host)
}
@@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val
}
override fun select(server: Server): HostView? {
- return hosts.asSequence()
- .filter { host ->
- for (filter in filters) {
- if (!filter.test(host, server))
- return@filter false
+ val hosts = hosts
+ val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } }
+
+ val subset = if (weighers.isNotEmpty()) {
+ val results = weighers.map { it.getWeights(filteredHosts, server) }
+ val weights = DoubleArray(filteredHosts.size)
+
+ for (result in results) {
+ val min = result.min
+ val range = (result.max - min)
+
+ // Skip result if all weights are the same
+ if (range == 0.0) {
+ continue
}
- true
- }
- .sortedByDescending { host ->
- weighers.sumOf { (weigher, factor) -> weigher.getWeight(host, server) * factor }
+ val multiplier = result.multiplier
+ val factor = multiplier / range
+
+ for ((i, weight) in result.weights.withIndex()) {
+ weights[i] += factor * (weight - min)
+ }
}
- .firstOrNull()
+
+ weights.indices
+ .asSequence()
+ .sortedByDescending { weights[it] }
+ .map { filteredHosts[it] }
+ .take(subsetSize)
+ .toList()
+ } else {
+ filteredHosts
+ }
+
+ return when (val maxSize = min(subsetSize, subset.size)) {
+ 0 -> null
+ 1 -> subset[0]
+ else -> subset[random.nextInt(maxSize)]
+ }
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
new file mode 100644
index 00000000..a470a453
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host.
+ *
+ * @param allocationRatio Virtual RAM to physical RAM allocation ratio.
+ */
+public class RamFilter(private val allocationRatio: Double) : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requested = server.flavor.memorySize
+ val available = host.availableMemory
+ val total = host.host.model.memorySize
+
+ // Do not allow an instance to overcommit against itself, only against
+ // other instances.
+ if (requested > total) {
+ return false
+ }
+
+ val limit = total * allocationRatio
+ val used = total - available
+ val usable = limit - used
+ return usable >= requested
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
index 072440c5..abdd79f1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt
@@ -26,15 +26,22 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server
- * flavor.
+ * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ComputeCapabilitiesFilter : HostFilter {
+public class VCpuFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
- val fitsMemory = host.availableMemory >= server.flavor.memorySize
- val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount
- return fitsMemory && fitsCpu
- }
+ val requested = server.flavor.cpuCount
+ val total = host.host.model.cpuCount
+ val limit = total * allocationRatio
- override fun toString(): String = "ComputeCapabilitiesFilter"
+ // Do not allow an instance to overcommit against itself, only against other instances
+ if (requested > total) {
+ return false
+ }
+
+ val free = limit - host.provisionedCores
+ return free >= requested
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
index 12e6510e..d668fdaf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt
@@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the available memory per core on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available core memory, and a negative number will result in the scheduler preferring hosts with less available core
+ * memory.
*/
-public class CoreMemoryWeigher : HostWeigher {
+public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble() / host.host.model.cpuCount
}
- override fun toString(): String = "CoreMemoryWeigher"
+ override fun toString(): String = "CoreRamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
index d48ee9e0..aca8c4e6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt
@@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler
/**
* An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request.
*/
-public fun interface HostWeigher {
+public interface HostWeigher {
+ /**
+ * The multiplier for the weigher.
+ */
+ public val multiplier: Double
+
/**
* Obtain the weight of the specified [host] when scheduling the specified [server].
*/
public fun getWeight(host: HostView, server: Server): Double
+
+ /**
+ * Obtain the weights for [hosts] when scheduling the specified [server].
+ */
+ public fun getWeights(hosts: List<HostView>, server: Server): Result {
+ val weights = DoubleArray(hosts.size)
+ var min = Double.MAX_VALUE
+ var max = Double.MIN_VALUE
+
+ for ((i, host) in hosts.withIndex()) {
+ val weight = getWeight(host, server)
+ weights[i] = weight
+ min = kotlin.math.min(min, weight)
+ max = kotlin.math.max(max, weight)
+ }
+
+ return Result(weights, min, max, multiplier)
+ }
+
+ /**
+ * A result returned by the weigher.
+ *
+ * @param weights The weights returned by the weigher.
+ * @param min The minimum weight returned.
+ * @param max The maximum weight returned.
+ * @param multiplier The weight multiplier to use.
+ */
+ public class Result(
+ public val weights: DoubleArray,
+ public val min: Double,
+ public val max: Double,
+ public val multiplier: Double,
+ )
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
index 2ef733e5..732cbe03 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt
@@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView
/**
* A [HostWeigher] that weighs the hosts based on the number of instances on the host.
*/
-public class InstanceCountWeigher : HostWeigher {
+public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.instanceCount.toDouble()
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
index 115d8e4d..d18d31f4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt
@@ -26,12 +26,15 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the available memory on the host.
+ * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host.
+ *
+ * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more
+ * available memory, and a negative number will result in the scheduler preferring hosts with less memory.
*/
-public class MemoryWeigher : HostWeigher {
+public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher {
override fun getWeight(host: HostView, server: Server): Double {
return host.availableMemory.toDouble()
}
- override fun toString(): String = "MemoryWeigher"
+ override fun toString(): String = "RamWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
deleted file mode 100644
index 1615df3a..00000000
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.scheduler.weights
-
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.internal.HostView
-import java.util.*
-
-/**
- * A [HostWeigher] that assigns random weights to each host every selection.
- */
-public class RandomWeigher(private val random: Random) : HostWeigher {
- override fun getWeight(host: HostView, server: Server): Double = random.nextDouble()
-
- override fun toString(): String = "RandomWeigher"
-}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
index df5bcd6e..4a22269b 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt
@@ -26,12 +26,19 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
/**
- * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host.
+ * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available.
+ *
+ * @param allocationRatio Virtual CPU to physical CPU allocation ratio.
*/
-public class ProvisionedCoresWeigher : HostWeigher {
+public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher {
+
+ init {
+ require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" }
+ }
+
override fun getWeight(host: HostView, server: Server): Double {
- return host.provisionedCores.toDouble()
+ return host.host.model.cpuCount * allocationRatio - host.provisionedCores
}
- override fun toString(): String = "ProvisionedCoresWeigher"
+ override fun toString(): String = "VCpuWeigher"
}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 7817c473..c6c01ea2 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.MemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
import java.util.*
@@ -57,8 +58,8 @@ internal class ComputeServiceTest {
scope = SimulationCoroutineScope()
val clock = scope.clock
val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
+ weighers = listOf(RamWeigher())
)
val meter = MeterProvider.noop().get("opendc-compute")
service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
new file mode 100644
index 00000000..cafd4498
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
@@ -0,0 +1,407 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.driver.HostModel
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.InstanceCountFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
+import java.util.*
+
+/**
+ * Test suite for the [FilterScheduler].
+ */
+internal class FilterSchedulerTest {
+ @Test
+ fun testInvalidSubsetSize() {
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = 0
+ )
+ }
+
+ assertThrows<IllegalArgumentException> {
+ FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = -2
+ )
+ }
+ }
+
+ @Test
+ fun testNoHosts() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulers() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostA, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testNoFiltersAndSchedulersRandom() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = Random(1)
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.DOWN
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ // Make sure we get the first host both times
+ assertAll(
+ { assertEquals(hostB, scheduler.select(server)) },
+ { assertEquals(hostA, scheduler.select(server)) }
+ )
+ }
+
+ @Test
+ fun testHostIsDown() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.DOWN
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testHostIsUp() {
+ val scheduler = FilterScheduler(
+ filters = listOf(ComputeFilter()),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(host, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 512
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 2048
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(RamFilter(1.5)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.availableMemory } returns 2048
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 2300
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(1.0)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 3
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuFilterOvercommit() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuFilter(16.0)),
+ weighers = emptyList(),
+ )
+
+ val host = mockk<HostView>()
+ every { host.host.state } returns HostState.UP
+ every { host.host.model } returns HostModel(4, 2048)
+ every { host.provisionedCores } returns 0
+
+ scheduler.addHost(host)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 8
+ every { server.flavor.memorySize } returns 1024
+
+ assertNull(scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(InstanceCountFilter(limit = 2)),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(RamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostA, scheduler.select(server))
+ }
+
+ @Test
+ fun testCoreRamWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(CoreRamWeigher(1.5)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(12, 2048)
+ every { hostA.availableMemory } returns 1024
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testVCpuWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(VCpuWeigher(16.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.provisionedCores } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.provisionedCores } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
+ fun testInstanceCountWeigher() {
+ val scheduler = FilterScheduler(
+ filters = emptyList(),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0)),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.instanceCount } returns 2
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.instanceCount } returns 0
+
+ scheduler.addHost(hostA)
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 42d240dc..2c443678 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -38,6 +38,15 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -301,3 +310,53 @@ fun createMeterProvider(clock: Clock): MeterProvider {
.registerView(powerSelector, powerView)
.build()
}
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = java.util.Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index cbb5bfd9..ee832af8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,10 +28,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.*
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
@@ -49,6 +45,7 @@ import java.io.File
import java.io.FileInputStream
import java.util.*
import java.util.concurrent.ConcurrentHashMap
+import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -105,7 +102,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createComputeScheduler(seeder)
+ val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
val meterProvider = createMeterProvider(clock)
val workload = workload
@@ -167,50 +164,4 @@ abstract class Portfolio(name: String) : Experiment(name) {
val monitorResults = collectMetrics(meterProvider as MetricProducer)
logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
-
- /**
- * Create the [ComputeScheduler] instance to use for the trial.
- */
- private fun createComputeScheduler(seeder: Random): ComputeScheduler {
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
- }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 08e04ddf..75428011 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,9 +31,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -68,8 +69,8 @@ class CapelinIntegrationTest {
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
@@ -113,9 +114,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -125,8 +126,8 @@ class CapelinIntegrationTest {
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 8fc4f6b8..e64e20a2 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -32,9 +32,9 @@ import mu.KotlinLogging
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.RandomWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
@@ -81,8 +81,9 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(0)) to 1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(),
+ subsetSize = Int.MAX_VALUE
)
val meterProvider: MeterProvider = createMeterProvider(clock)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index c1b1450e..860c50ee 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -88,6 +88,10 @@ public abstract class SimAbstractResourceProvider(
* Update the counters of the resource provider.
*/
protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ if (work <= 0.0) {
+ return
+ }
+
val counters = _counters
val remainingWork = ctx.remainingWork
counters.demand += work
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 5b5ef802..c5f5cd03 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -32,9 +32,6 @@ import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
@@ -199,46 +196,7 @@ class RunnerCli : CliktCommand(name = "runner") {
val metricProducer = meterProvider as MetricProducer
val operational = scenario.operationalPhenomena
- val allocationPolicy =
- when (val policyName = operational.schedulerName) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to 1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to 1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(java.util.Random(seeder.nextLong())) to 1.0)
- )
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
+ val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder)
val trace = ParquetTraceReader(
listOf(traceReader),
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
index 38c774a9..d82959e7 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -34,9 +34,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
@@ -55,7 +56,7 @@ import kotlin.math.max
/**
* Integration test suite for the [WorkflowServiceImpl].
*/
-@DisplayName("WorkflowServiceImpl")
+@DisplayName("WorkflowService")
internal class WorkflowServiceIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -85,8 +86,8 @@ internal class WorkflowServiceIntegrationTest {
val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)