diff options
Diffstat (limited to 'opendc-compute')
73 files changed, 1496 insertions, 1239 deletions
diff --git a/opendc-compute/opendc-compute-api/build.gradle.kts b/opendc-compute/opendc-compute-api/build.gradle.kts index 2ac7e64c..f9b04299 100644 --- a/opendc-compute/opendc-compute-api/build.gradle.kts +++ b/opendc-compute/opendc-compute-api/build.gradle.kts @@ -22,7 +22,7 @@ description = "API interface for the OpenDC Compute service" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index c26d0b8b..09cfe6f5 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -54,7 +54,7 @@ public interface ComputeClient : AutoCloseable { cpuCount: Int, memorySize: Long, labels: Map<String, String> = emptyMap(), - meta: Map<String, Any> = emptyMap() + meta: Map<String, Any> = emptyMap(), ): Flavor /** @@ -79,7 +79,7 @@ public interface ComputeClient : AutoCloseable { public fun newImage( name: String, labels: Map<String, String> = emptyMap(), - meta: Map<String, Any> = emptyMap() + meta: Map<String, Any> = emptyMap(), ): Image /** @@ -110,7 +110,7 @@ public interface ComputeClient : AutoCloseable { flavor: Flavor, labels: Map<String, String> = emptyMap(), meta: Map<String, Any> = emptyMap(), - start: Boolean = true + start: Boolean = true, ): Server /** diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt index 8fbb7308..497d5266 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt @@ -26,4 +26,6 @@ package org.opendc.compute.api * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to * fulfill a launch request. */ -public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request") +public class InsufficientServerCapacityException( + override val cause: Throwable? = null, +) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt index 2b5aebb1..a4d7d7d7 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt @@ -49,5 +49,5 @@ public enum class ServerState { /** * The server has been deleted and cannot be started later on. */ - DELETED + DELETED, } diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt index cf995fc3..3229e101 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt @@ -32,5 +32,8 @@ public interface ServerWatcher { * @param server The server whose state has changed. * @param newState The new state of the server. */ - public fun onStateChanged(server: Server, newState: ServerState) {} + public fun onStateChanged( + server: Server, + newState: ServerState, + ) {} } diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index 1a73201e..0efdb05f 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -22,7 +22,7 @@ description = "OpenDC Compute Service implementation" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } @@ -33,6 +33,7 @@ dependencies { implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) + testImplementation(libs.log4j.slf4j) testRuntimeOnly(libs.log4j.core) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt index 2f071c13..18947146 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComputeSchedulers.kt @@ -37,48 +37,61 @@ import java.util.random.RandomGenerator /** * Create a [ComputeScheduler] for the experiment. */ -public fun createComputeScheduler(name: String, seeder: RandomGenerator, placements: Map<String, String> = emptyMap()): ComputeScheduler { +public fun createComputeScheduler( + name: String, + seeder: RandomGenerator, + placements: Map<String, String> = emptyMap(), +): ComputeScheduler { val cpuAllocationRatio = 16.0 val ramAllocationRatio = 1.5 return when (name) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = SplittableRandom(seeder.nextLong()) - ) + "mem" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)), + ) + "mem-inv" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)), + ) + "core-mem" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + "core-mem-inv" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)), + ) + "active-servers" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) + "active-servers-inv" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)), + ) + "provisioned-cores" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)), + ) + "provisioned-cores-inv" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)), + ) + "random" -> + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = SplittableRandom(seeder.nextLong()), + ) "replay" -> ReplayScheduler(placements) else -> throw IllegalArgumentException("Unknown policy $name") } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 18a319e9..cdcd1af0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -46,7 +46,7 @@ public class FilterScheduler( private val filters: List<HostFilter>, private val weighers: List<HostWeigher>, private val subsetSize: Int = 1, - private val random: RandomGenerator = SplittableRandom(0) + private val random: RandomGenerator = SplittableRandom(0), ) : ComputeScheduler { /** * The pool of hosts available to the scheduler. @@ -69,36 +69,37 @@ public class FilterScheduler( val hosts = hosts val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } } - val subset = if (weighers.isNotEmpty()) { - val results = weighers.map { it.getWeights(filteredHosts, server) } - val weights = DoubleArray(filteredHosts.size) + val subset = + if (weighers.isNotEmpty()) { + val results = weighers.map { it.getWeights(filteredHosts, server) } + val weights = DoubleArray(filteredHosts.size) - for (result in results) { - val min = result.min - val range = (result.max - min) + for (result in results) { + val min = result.min + val range = (result.max - min) - // Skip result if all weights are the same - if (range == 0.0) { - continue - } + // Skip result if all weights are the same + if (range == 0.0) { + continue + } - val multiplier = result.multiplier - val factor = multiplier / range + val multiplier = result.multiplier + val factor = multiplier / range - for ((i, weight) in result.weights.withIndex()) { - weights[i] += factor * (weight - min) + for ((i, weight) in result.weights.withIndex()) { + weights[i] += factor * (weight - min) + } } - } - weights.indices - .asSequence() - .sortedByDescending { weights[it] } - .map { filteredHosts[it] } - .take(subsetSize) - .toList() - } else { - filteredHosts - } + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } return when (val maxSize = min(subsetSize, subset.size)) { 0 -> null diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt index 4339b3de..a6703c89 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayScheduler.kt @@ -49,8 +49,9 @@ public class ReplayScheduler(private val vmPlacements: Map<String, String>) : Co } override fun select(server: Server): HostView? { - val clusterName = vmPlacements[server.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") + val clusterName = + vmPlacements[server.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") val machinesInCluster = hosts.filter { it.host.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt index b562f838..23590c13 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeFilter.kt @@ -30,7 +30,10 @@ import org.opendc.compute.service.driver.HostState * A [HostFilter] that filters on active hosts. */ public class ComputeFilter : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { return host.host.state == HostState.UP } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt index 4a9f41c5..df67a19f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/DifferentHostFilter.kt @@ -30,7 +30,10 @@ import java.util.UUID * A [HostFilter] that ensures an instance is scheduled on a different host from a set of instances. */ public class DifferentHostFilter : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { @Suppress("UNCHECKED_CAST") val affinityUUIDs = server.meta["scheduler_hint:different_host"] as? Set<UUID> ?: return true return host.host.instances.none { it.uid in affinityUUIDs } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt index 78010fee..902c760e 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/HostFilter.kt @@ -34,5 +34,8 @@ public fun interface HostFilter { * Test whether the specified [host] should be included in the selection * for scheduling the specified [server]. */ - public fun test(host: HostView, server: Server): Boolean + public fun test( + host: HostView, + server: Server, + ): Boolean } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt index 5aa38a88..d9348802 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/InstanceCountFilter.kt @@ -31,7 +31,10 @@ import org.opendc.compute.service.HostView * @param limit The maximum number of instances on the host. */ public class InstanceCountFilter(private val limit: Int) : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { return host.instanceCount < limit } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt index 275e8f1c..4792a7a0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -31,7 +31,10 @@ import org.opendc.compute.service.HostView * @param allocationRatio Virtual RAM to physical RAM allocation ratio. */ public class RamFilter(private val allocationRatio: Double) : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { val requested = server.flavor.memorySize val available = host.availableMemory val total = host.host.model.memoryCapacity diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt index c3753866..4c31c66a 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/SameHostFilter.kt @@ -30,7 +30,10 @@ import java.util.UUID * A [HostFilter] that ensures an instance is scheduled on the same host as all other instances in a set of instances. */ public class SameHostFilter : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { @Suppress("UNCHECKED_CAST") val affinityUUIDs = server.meta["scheduler_hint:same_host"] as? Set<UUID> ?: return true return host.host.instances.any { it.uid in affinityUUIDs } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt index d4dff76b..e3397e50 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt @@ -30,7 +30,10 @@ import org.opendc.compute.service.HostView * capacity on the host. */ public class VCpuCapacityFilter : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double val hostModel = host.host.model val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt index 448a6189..5d02873f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt @@ -31,7 +31,10 @@ import org.opendc.compute.service.HostView * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ public class VCpuFilter(private val allocationRatio: Double) : HostFilter { - override fun test(host: HostView, server: Server): Boolean { + override fun test( + host: HostView, + server: Server, + ): Boolean { val requested = server.flavor.cpuCount val total = host.host.model.cpuCount val limit = total * allocationRatio diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt index f79d6d88..d6aafbc7 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt @@ -33,7 +33,10 @@ import org.opendc.compute.service.HostView * memory. */ public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { + override fun getWeight( + host: HostView, + server: Server, + ): Double { return host.availableMemory.toDouble() / host.host.model.cpuCount } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt index 01799122..825cfff9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt @@ -38,12 +38,18 @@ public interface HostWeigher { /** * Obtain the weight of the specified [host] when scheduling the specified [server]. */ - public fun getWeight(host: HostView, server: Server): Double + public fun getWeight( + host: HostView, + server: Server, + ): Double /** * Obtain the weights for [hosts] when scheduling the specified [server]. */ - public fun getWeights(hosts: List<HostView>, server: Server): Result { + public fun getWeights( + hosts: List<HostView>, + server: Server, + ): Result { val weights = DoubleArray(hosts.size) var min = Double.MAX_VALUE var max = Double.MIN_VALUE @@ -70,6 +76,6 @@ public interface HostWeigher { public val weights: DoubleArray, public val min: Double, public val max: Double, - public val multiplier: Double + public val multiplier: Double, ) } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt index bfb583a2..9e0a9517 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt @@ -29,7 +29,10 @@ import org.opendc.compute.service.HostView * A [HostWeigher] that weighs the hosts based on the number of instances on the host. */ public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { + override fun getWeight( + host: HostView, + server: Server, + ): Double { return host.instanceCount.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt index bb837fbe..fca2e893 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt @@ -32,7 +32,10 @@ import org.opendc.compute.service.HostView * available memory, and a negative number will result in the scheduler preferring hosts with less memory. */ public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double { + override fun getWeight( + host: HostView, + server: Server, + ): Double { return host.availableMemory.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt index f15f60c9..2912ce49 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt @@ -29,8 +29,10 @@ import org.opendc.compute.service.HostView * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity. */ public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { - - override fun getWeight(host: HostView, server: Server): Double { + override fun getWeight( + host: HostView, + server: Server, + ): Double { val model = host.host.model val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0 return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt index 169ad8cb..be93458f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt @@ -31,12 +31,14 @@ import org.opendc.compute.service.HostView * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher { - init { require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" } } - override fun getWeight(host: HostView, server: Server): Double { + override fun getWeight( + host: HostView, + server: Server, + ): Double { return host.host.model.cpuCount * allocationRatio - host.provisionedCores } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 4dc1cfa8..52caea0c 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -63,309 +63,324 @@ internal class ComputeServiceTest { @BeforeEach fun setUp() { scope = SimulationCoroutineScope() - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), - weighers = listOf(RamWeigher()) - ) + val computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), + weighers = listOf(RamWeigher()), + ) service = ComputeService(scope.dispatcher, computeScheduler, Duration.ofMinutes(5)) } @Test - fun testClientClose() = scope.runSimulation { - val client = service.newClient() + fun testClientClose() = + scope.runSimulation { + val client = service.newClient() - assertEquals(emptyList<Flavor>(), client.queryFlavors()) - assertEquals(emptyList<Image>(), client.queryImages()) - assertEquals(emptyList<Server>(), client.queryServers()) + assertEquals(emptyList<Flavor>(), client.queryFlavors()) + assertEquals(emptyList<Image>(), client.queryImages()) + assertEquals(emptyList<Server>(), client.queryServers()) - client.close() + client.close() - assertThrows<IllegalStateException> { client.queryFlavors() } - assertThrows<IllegalStateException> { client.queryImages() } - assertThrows<IllegalStateException> { client.queryServers() } + assertThrows<IllegalStateException> { client.queryFlavors() } + assertThrows<IllegalStateException> { client.queryImages() } + assertThrows<IllegalStateException> { client.queryServers() } - assertThrows<IllegalStateException> { client.findFlavor(UUID.randomUUID()) } - assertThrows<IllegalStateException> { client.findImage(UUID.randomUUID()) } - assertThrows<IllegalStateException> { client.findServer(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findFlavor(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findImage(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findServer(UUID.randomUUID()) } - assertThrows<IllegalStateException> { client.newFlavor("test", 1, 2) } - assertThrows<IllegalStateException> { client.newImage("test") } - assertThrows<IllegalStateException> { client.newServer("test", mockk(), mockk()) } - } + assertThrows<IllegalStateException> { client.newFlavor("test", 1, 2) } + assertThrows<IllegalStateException> { client.newImage("test") } + assertThrows<IllegalStateException> { client.newServer("test", mockk(), mockk()) } + } @Test - fun testClientCreate() = scope.runSimulation { - val client = service.newClient() - - val flavor = client.newFlavor("test", 1, 1024) - assertEquals(listOf(flavor), client.queryFlavors()) - assertEquals(flavor, client.findFlavor(flavor.uid)) - val image = client.newImage("test") - assertEquals(listOf(image), client.queryImages()) - assertEquals(image, client.findImage(image.uid)) - val server = client.newServer("test", image, flavor, start = false) - assertEquals(listOf(server), client.queryServers()) - assertEquals(server, client.findServer(server.uid)) - - server.delete() - assertNull(client.findServer(server.uid)) - - image.delete() - assertNull(client.findImage(image.uid)) - - flavor.delete() - assertNull(client.findFlavor(flavor.uid)) - - assertThrows<IllegalStateException> { server.start() } - } + fun testClientCreate() = + scope.runSimulation { + val client = service.newClient() + + val flavor = client.newFlavor("test", 1, 1024) + assertEquals(listOf(flavor), client.queryFlavors()) + assertEquals(flavor, client.findFlavor(flavor.uid)) + val image = client.newImage("test") + assertEquals(listOf(image), client.queryImages()) + assertEquals(image, client.findImage(image.uid)) + val server = client.newServer("test", image, flavor, start = false) + assertEquals(listOf(server), client.queryServers()) + assertEquals(server, client.findServer(server.uid)) + + server.delete() + assertNull(client.findServer(server.uid)) + + image.delete() + assertNull(client.findImage(image.uid)) + + flavor.delete() + assertNull(client.findFlavor(flavor.uid)) + + assertThrows<IllegalStateException> { server.start() } + } @Test - fun testClientOnClose() = scope.runSimulation { - service.close() - assertThrows<IllegalStateException> { - service.newClient() + fun testClientOnClose() = + scope.runSimulation { + service.close() + assertThrows<IllegalStateException> { + service.newClient() + } } - } @Test - fun testAddHost() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) + fun testAddHost() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.UP + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.UP - assertEquals(emptySet<Host>(), service.hosts) + assertEquals(emptySet<Host>(), service.hosts) - service.addHost(host) + service.addHost(host) - verify(exactly = 1) { host.addListener(any()) } + verify(exactly = 1) { host.addListener(any()) } - assertEquals(1, service.hosts.size) + assertEquals(1, service.hosts.size) - service.removeHost(host) + service.removeHost(host) - verify(exactly = 1) { host.removeListener(any()) } - } + verify(exactly = 1) { host.removeListener(any()) } + } @Test - fun testAddHostDouble() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) + fun testAddHostDouble() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.DOWN + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.DOWN - assertEquals(emptySet<Host>(), service.hosts) + assertEquals(emptySet<Host>(), service.hosts) - service.addHost(host) - service.addHost(host) + service.addHost(host) + service.addHost(host) - verify(exactly = 1) { host.addListener(any()) } - } + verify(exactly = 1) { host.addListener(any()) } + } @Test - fun testServerStartWithoutEnoughCpus() = scope.runSimulation { - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 0) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - - server.start() - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.TERMINATED, server.state) - } + fun testServerStartWithoutEnoughCpus() = + scope.runSimulation { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 0) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testServerStartWithoutEnoughMemory() = scope.runSimulation { - val client = service.newClient() - val flavor = client.newFlavor("test", 0, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - - server.start() - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.TERMINATED, server.state) - } + fun testServerStartWithoutEnoughMemory() = + scope.runSimulation { + val client = service.newClient() + val flavor = client.newFlavor("test", 0, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testServerStartWithoutEnoughResources() = scope.runSimulation { - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - - server.start() - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.TERMINATED, server.state) - } + fun testServerStartWithoutEnoughResources() = + scope.runSimulation { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testServerCancelRequest() = scope.runSimulation { - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - - server.start() - server.stop() - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.TERMINATED, server.state) - } + fun testServerCancelRequest() = + scope.runSimulation { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + server.stop() + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testServerCannotFitOnHost() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) + fun testServerCannotFitOnHost() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.UP - every { host.canFit(any()) } returns false + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns false - service.addHost(host) + service.addHost(host) - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) - server.start() - delay(10L * 60 * 1000) - server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + server.start() + delay(10L * 60 * 1000) + server.reload() + assertEquals(ServerState.PROVISIONING, server.state) - verify { host.canFit(server) } - } + verify { host.canFit(server) } + } @Test - fun testHostAvailableAfterSomeTime() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) - val listeners = mutableListOf<HostListener>() + fun testHostAvailableAfterSomeTime() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() - every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.DOWN - every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - every { host.canFit(any()) } returns false + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.DOWN + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false - service.addHost(host) + service.addHost(host) - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) - server.start() - delay(5L * 60 * 1000) + server.start() + delay(5L * 60 * 1000) - every { host.state } returns HostState.UP - listeners.forEach { it.onStateChanged(host, HostState.UP) } + every { host.state } returns HostState.UP + listeners.forEach { it.onStateChanged(host, HostState.UP) } - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.PROVISIONING, server.state) - verify { host.canFit(server) } - } + verify { host.canFit(server) } + } @Test - fun testHostUnavailableAfterSomeTime() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) - val listeners = mutableListOf<HostListener>() + fun testHostUnavailableAfterSomeTime() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() - every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.UP - every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - every { host.canFit(any()) } returns false + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.UP + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false - service.addHost(host) + service.addHost(host) - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) - delay(5L * 60 * 1000) + delay(5L * 60 * 1000) - every { host.state } returns HostState.DOWN - listeners.forEach { it.onStateChanged(host, HostState.DOWN) } + every { host.state } returns HostState.DOWN + listeners.forEach { it.onStateChanged(host, HostState.DOWN) } - server.start() - delay(5L * 60 * 1000) - server.reload() - assertEquals(ServerState.PROVISIONING, server.state) + server.start() + delay(5L * 60 * 1000) + server.reload() + assertEquals(ServerState.PROVISIONING, server.state) - verify(exactly = 0) { host.canFit(server) } - } + verify(exactly = 0) { host.canFit(server) } + } @Test - fun testServerDeploy() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) - val listeners = mutableListOf<HostListener>() + fun testServerDeploy() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() - every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.UP - every { host.canFit(any()) } returns true - every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - service.addHost(host) + service.addHost(host) - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - val slot = slot<Server>() + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + val slot = slot<Server>() - val watcher = mockk<ServerWatcher>(relaxUnitFun = true) - server.watch(watcher) + val watcher = mockk<ServerWatcher>(relaxUnitFun = true) + server.watch(watcher) - // Start server - server.start() - delay(5L * 60 * 1000) - coVerify { host.spawn(capture(slot)) } + // Start server + server.start() + delay(5L * 60 * 1000) + coVerify { host.spawn(capture(slot)) } - listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } - server.reload() - assertEquals(ServerState.RUNNING, server.state) + server.reload() + assertEquals(ServerState.RUNNING, server.state) - verify { watcher.onStateChanged(server, ServerState.RUNNING) } + verify { watcher.onStateChanged(server, ServerState.RUNNING) } - // Stop server - listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) } + // Stop server + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) } - server.reload() - assertEquals(ServerState.TERMINATED, server.state) + server.reload() + assertEquals(ServerState.TERMINATED, server.state) - verify { watcher.onStateChanged(server, ServerState.TERMINATED) } - } + verify { watcher.onStateChanged(server, ServerState.TERMINATED) } + } @Test - fun testServerDeployFailure() = scope.runSimulation { - val host = mockk<Host>(relaxUnitFun = true) - val listeners = mutableListOf<HostListener>() - - every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4 * 2600.0, 4, 2048) - every { host.state } returns HostState.UP - every { host.canFit(any()) } returns true - every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } - coEvery { host.spawn(any()) } throws IllegalStateException() - - service.addHost(host) - - val client = service.newClient() - val flavor = client.newFlavor("test", 1, 1024) - val image = client.newImage("test") - val server = client.newServer("test", image, flavor, start = false) - - server.start() - delay(5L * 60 * 1000) - - server.reload() - assertEquals(ServerState.PROVISIONING, server.state) - } + fun testServerDeployFailure() = + scope.runSimulation { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + coEvery { host.spawn(any()) } throws IllegalStateException() + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5L * 60 * 1000) + + server.reload() + assertEquals(ServerState.PROVISIONING, server.state) + } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt index f9fcd27b..6e0f11b3 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ServiceServerTest.kt @@ -80,193 +80,205 @@ class ServiceServerTest { } @Test - fun testStartTerminatedServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStartTerminatedServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - every { service.schedule(any()) } answers { ComputeService.SchedulingRequest(it.invocation.args[0] as ServiceServer, 0) } + every { service.schedule(any()) } answers { ComputeService.SchedulingRequest(it.invocation.args[0] as ServiceServer, 0) } - server.start() + server.start() - verify(exactly = 1) { service.schedule(server) } - assertEquals(ServerState.PROVISIONING, server.state) - } + verify(exactly = 1) { service.schedule(server) } + assertEquals(ServerState.PROVISIONING, server.state) + } @Test - fun testStartDeletedServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStartDeletedServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.DELETED) + server.setState(ServerState.DELETED) - assertThrows<IllegalStateException> { server.start() } - } + assertThrows<IllegalStateException> { server.start() } + } @Test - fun testStartProvisioningServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStartProvisioningServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.PROVISIONING) + server.setState(ServerState.PROVISIONING) - server.start() + server.start() - assertEquals(ServerState.PROVISIONING, server.state) - } + assertEquals(ServerState.PROVISIONING, server.state) + } @Test - fun testStartRunningServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStartRunningServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.RUNNING) + server.setState(ServerState.RUNNING) - server.start() + server.start() - assertEquals(ServerState.RUNNING, server.state) - } + assertEquals(ServerState.RUNNING, server.state) + } @Test - fun testStopProvisioningServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - val request = ComputeService.SchedulingRequest(server, 0) + fun testStopProvisioningServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + val request = ComputeService.SchedulingRequest(server, 0) - every { service.schedule(any()) } returns request + every { service.schedule(any()) } returns request - server.start() - server.stop() + server.start() + server.stop() - assertTrue(request.isCancelled) - assertEquals(ServerState.TERMINATED, server.state) - } + assertTrue(request.isCancelled) + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testStopTerminatedServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStopTerminatedServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.TERMINATED) - server.stop() + server.setState(ServerState.TERMINATED) + server.stop() - assertEquals(ServerState.TERMINATED, server.state) - } + assertEquals(ServerState.TERMINATED, server.state) + } @Test - fun testStopDeletedServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testStopDeletedServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.DELETED) - server.stop() + server.setState(ServerState.DELETED) + server.stop() - assertEquals(ServerState.DELETED, server.state) - } + assertEquals(ServerState.DELETED, server.state) + } @Test - fun testStopRunningServer() = runSimulation { - val service = mockk<ComputeService>() - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - val host = mockk<Host>(relaxUnitFun = true) - - server.setState(ServerState.RUNNING) - server.host = host - server.stop() - yield() - - verify { host.stop(server) } - } + fun testStopRunningServer() = + runSimulation { + val service = mockk<ComputeService>() + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + val host = mockk<Host>(relaxUnitFun = true) + + server.setState(ServerState.RUNNING) + server.host = host + server.stop() + yield() + + verify { host.stop(server) } + } @Test - fun testDeleteProvisioningServer() = runSimulation { - val service = mockk<ComputeService>(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - val request = ComputeService.SchedulingRequest(server, 0) + fun testDeleteProvisioningServer() = + runSimulation { + val service = mockk<ComputeService>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + val request = ComputeService.SchedulingRequest(server, 0) - every { service.schedule(any()) } returns request + every { service.schedule(any()) } returns request - server.start() - server.delete() + server.start() + server.delete() - assertTrue(request.isCancelled) - assertEquals(ServerState.DELETED, server.state) - verify { service.delete(server) } - } + assertTrue(request.isCancelled) + assertEquals(ServerState.DELETED, server.state) + verify { service.delete(server) } + } @Test - fun testDeleteTerminatedServer() = runSimulation { - val service = mockk<ComputeService>(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testDeleteTerminatedServer() = + runSimulation { + val service = mockk<ComputeService>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.TERMINATED) - server.delete() + server.setState(ServerState.TERMINATED) + server.delete() - assertEquals(ServerState.DELETED, server.state) + assertEquals(ServerState.DELETED, server.state) - verify { service.delete(server) } - } + verify { service.delete(server) } + } @Test - fun testDeleteDeletedServer() = runSimulation { - val service = mockk<ComputeService>(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + fun testDeleteDeletedServer() = + runSimulation { + val service = mockk<ComputeService>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - server.setState(ServerState.DELETED) - server.delete() + server.setState(ServerState.DELETED) + server.delete() - assertEquals(ServerState.DELETED, server.state) - } + assertEquals(ServerState.DELETED, server.state) + } @Test - fun testDeleteRunningServer() = runSimulation { - val service = mockk<ComputeService>(relaxUnitFun = true) - val uid = UUID.randomUUID() - val flavor = mockFlavor() - val image = mockImage() - val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) - val host = mockk<Host>(relaxUnitFun = true) - - server.setState(ServerState.RUNNING) - server.host = host - server.delete() - yield() - - verify { host.delete(server) } - verify { service.delete(server) } - } + fun testDeleteRunningServer() = + runSimulation { + val service = mockk<ComputeService>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockFlavor() + val image = mockImage() + val server = ServiceServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf<String, Any>()) + val host = mockk<Host>(relaxUnitFun = true) + + server.setState(ServerState.RUNNING) + server.host = host + server.delete() + yield() + + verify { host.delete(server) } + verify { service.delete(server) } + } private fun mockFlavor(): ServiceFlavor { val flavor = mockk<ServiceFlavor>() diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt index 4af6f7ec..a48052a1 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -57,7 +57,7 @@ internal class FilterSchedulerTest { FilterScheduler( filters = emptyList(), weighers = emptyList(), - subsetSize = 0 + subsetSize = 0, ) } @@ -65,17 +65,18 @@ internal class FilterSchedulerTest { FilterScheduler( filters = emptyList(), weighers = emptyList(), - subsetSize = -2 + subsetSize = -2, ) } } @Test fun testNoHosts() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) val server = mockk<Server>() every { server.flavor.cpuCount } returns 2 @@ -86,10 +87,11 @@ internal class FilterSchedulerTest { @Test fun testNoFiltersAndSchedulers() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.DOWN @@ -107,18 +109,19 @@ internal class FilterSchedulerTest { // Make sure we get the first host both times assertAll( { assertEquals(hostA, scheduler.select(server)) }, - { assertEquals(hostA, scheduler.select(server)) } + { assertEquals(hostA, scheduler.select(server)) }, ) } @Test fun testNoFiltersAndSchedulersRandom() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = Random(1) - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(1), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.DOWN @@ -136,16 +139,17 @@ internal class FilterSchedulerTest { // Make sure we get the first host both times assertAll( { assertEquals(hostB, scheduler.select(server)) }, - { assertEquals(hostA, scheduler.select(server)) } + { assertEquals(hostA, scheduler.select(server)) }, ) } @Test fun testHostIsDown() { - val scheduler = FilterScheduler( - filters = listOf(ComputeFilter()), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) val host = mockk<HostView>() every { host.host.state } returns HostState.DOWN @@ -161,10 +165,11 @@ internal class FilterSchedulerTest { @Test fun testHostIsUp() { - val scheduler = FilterScheduler( - filters = listOf(ComputeFilter()), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) val host = mockk<HostView>() every { host.host.state } returns HostState.UP @@ -180,10 +185,11 @@ internal class FilterSchedulerTest { @Test fun testRamFilter() { - val scheduler = FilterScheduler( - filters = listOf(RamFilter(1.0)), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(RamFilter(1.0)), + weighers = emptyList(), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -207,10 +213,11 @@ internal class FilterSchedulerTest { @Test fun testRamFilterOvercommit() { - val scheduler = FilterScheduler( - filters = listOf(RamFilter(1.5)), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(RamFilter(1.5)), + weighers = emptyList(), + ) val host = mockk<HostView>() every { host.host.state } returns HostState.UP @@ -228,10 +235,11 @@ internal class FilterSchedulerTest { @Test fun testVCpuFilter() { - val scheduler = FilterScheduler( - filters = listOf(VCpuFilter(1.0)), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(VCpuFilter(1.0)), + weighers = emptyList(), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -255,10 +263,11 @@ internal class FilterSchedulerTest { @Test fun testVCpuFilterOvercommit() { - val scheduler = FilterScheduler( - filters = listOf(VCpuFilter(16.0)), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(VCpuFilter(16.0)), + weighers = emptyList(), + ) val host = mockk<HostView>() every { host.host.state } returns HostState.UP @@ -276,10 +285,11 @@ internal class FilterSchedulerTest { @Test fun testVCpuCapacityFilter() { - val scheduler = FilterScheduler( - filters = listOf(VCpuCapacityFilter()), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(VCpuCapacityFilter()), + weighers = emptyList(), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -304,10 +314,11 @@ internal class FilterSchedulerTest { @Test fun testInstanceCountFilter() { - val scheduler = FilterScheduler( - filters = listOf(InstanceCountFilter(limit = 2)), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(InstanceCountFilter(limit = 2)), + weighers = emptyList(), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -331,10 +342,11 @@ internal class FilterSchedulerTest { @Test fun testAffinityFilter() { - val scheduler = FilterScheduler( - filters = listOf(SameHostFilter()), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(SameHostFilter()), + weighers = emptyList(), + ) val serverA = mockk<Server>() every { serverA.uid } returns UUID.randomUUID() @@ -370,10 +382,11 @@ internal class FilterSchedulerTest { @Test fun testAntiAffinityFilter() { - val scheduler = FilterScheduler( - filters = listOf(DifferentHostFilter()), - weighers = emptyList() - ) + val scheduler = + FilterScheduler( + filters = listOf(DifferentHostFilter()), + weighers = emptyList(), + ) val serverA = mockk<Server>() every { serverA.uid } returns UUID.randomUUID() @@ -409,10 +422,11 @@ internal class FilterSchedulerTest { @Test fun testRamWeigher() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = listOf(RamWeigher(1.5)) - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = listOf(RamWeigher(1.5)), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -436,10 +450,11 @@ internal class FilterSchedulerTest { @Test fun testCoreRamWeigher() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = listOf(CoreRamWeigher(1.5)) - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = listOf(CoreRamWeigher(1.5)), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -463,10 +478,11 @@ internal class FilterSchedulerTest { @Test fun testVCpuWeigher() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = listOf(VCpuWeigher(16.0)) - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = listOf(VCpuWeigher(16.0)), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP @@ -490,10 +506,11 @@ internal class FilterSchedulerTest { @Test fun testInstanceCountWeigher() { - val scheduler = FilterScheduler( - filters = emptyList(), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) + val scheduler = + FilterScheduler( + filters = emptyList(), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index 625f278b..9692f6ba 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -22,7 +22,7 @@ description = "Simulator for OpenDC Compute" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt index 49b3688e..ca72c910 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/MutableServiceRegistry.kt @@ -34,7 +34,11 @@ public interface MutableServiceRegistry : ServiceRegistry { * @param type The interface provided by the service. * @param service A reference to the actual implementation of the service. */ - public fun <T : Any> register(name: String, type: Class<T>, service: T) + public fun <T : Any> register( + name: String, + type: Class<T>, + service: T, + ) /** * Remove the service with [name] and [type] from this registry. @@ -42,7 +46,10 @@ public interface MutableServiceRegistry : ServiceRegistry { * @param name The name of the service to remove, which should follow the rules for domain names as defined by DNS. * @param type The type of the service to remove. */ - public fun remove(name: String, type: Class<*>) + public fun remove( + name: String, + type: Class<*>, + ) /** * Remove all services registered with [name]. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt index d3af3f01..5a4bced1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistry.kt @@ -36,7 +36,10 @@ public interface ServiceRegistry { * @param type The type of the service to resolve, identified by the interface that is implemented by the service. * @return The service with specified [name] and implementing [type] or `null` if it does not exist. */ - public fun <T : Any> resolve(name: String, type: Class<T>): T? + public fun <T : Any> resolve( + name: String, + type: Class<T>, + ): T? /** * Create a copy of the registry. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt index a9d05844..bf3ee43f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ServiceRegistryImpl.kt @@ -27,14 +27,21 @@ package org.opendc.compute.simulator */ internal class ServiceRegistryImpl(private val registry: MutableMap<String, MutableMap<Class<*>, Any>> = mutableMapOf()) : MutableServiceRegistry { - override fun <T : Any> resolve(name: String, type: Class<T>): T? { + override fun <T : Any> resolve( + name: String, + type: Class<T>, + ): T? { val servicesForName = registry[name] ?: return null @Suppress("UNCHECKED_CAST") return servicesForName[type] as T? } - override fun <T : Any> register(name: String, type: Class<T>, service: T) { + override fun <T : Any> register( + name: String, + type: Class<T>, + service: T, + ) { val services = registry.computeIfAbsent(name) { mutableMapOf() } if (type in services) { @@ -44,7 +51,10 @@ internal class ServiceRegistryImpl(private val registry: MutableMap<String, Muta services[type] = service } - override fun remove(name: String, type: Class<*>) { + override fun remove( + name: String, + type: Class<*>, + ) { val services = registry[name] ?: return services.remove(type) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 16ded689..47650f5d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -73,9 +73,8 @@ public class SimHost( private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = DefaultWorkloadMapper, private val bootModel: Supplier<SimWorkload?> = Supplier { null }, - private val optimize: Boolean = false + private val optimize: Boolean = false, ) : Host, AutoCloseable { - /** * The event listeners registered with this host. */ @@ -85,9 +84,9 @@ public class SimHost( * The virtual machines running on the hypervisor. */ private val guests = HashMap<Server, Guest>() - private val _guests = mutableListOf<Guest>() + private val temporaryGuests = mutableListOf<Guest>() // TODO: Determine a better naming for this - private var _state: HostState = HostState.DOWN + private var localState: HostState = HostState.DOWN set(value) { if (value != field) { listeners.forEach { it.onStateChanged(this, value) } @@ -95,24 +94,26 @@ public class SimHost( field = value } - private val model: HostModel = HostModel( - machine.model.cpus.sumOf { it.frequency }, - machine.model.cpus.size, - machine.model.memory.sumOf { it.size } - ) + private val model: HostModel = + HostModel( + machine.model.cpus.sumOf { it.frequency }, + machine.model.cpus.size, + machine.model.memory.sumOf { it.size }, + ) /** * The [GuestListener] that listens for guest events. */ - private val guestListener = object : GuestListener { - override fun onStart(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } - } + private val guestListener = + object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - override fun onStop(guest: Guest) { - listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } } - } init { launch() @@ -135,7 +136,7 @@ public class SimHost( } override fun getState(): HostState { - return _state + return localState } override fun getInstances(): Set<Server> { @@ -155,17 +156,18 @@ public class SimHost( require(canFit(key)) { "Server does not fit" } val machine = hypervisor.newMachine(key.flavor.toMachineModel()) - val newGuest = Guest( - clock, - this, - hypervisor, - mapper, - guestListener, - server, - machine - ) - - _guests.add(newGuest) + val newGuest = + Guest( + clock, + this, + hypervisor, + mapper, + guestListener, + server, + machine, + ) + + temporaryGuests.add(newGuest) newGuest } } @@ -210,7 +212,7 @@ public class SimHost( var error = 0 var invalid = 0 - val guests = _guests.listIterator() + val guests = temporaryGuests.listIterator() for (guest in guests) { when (guest.state) { ServerState.TERMINATED -> terminated++ @@ -226,15 +228,15 @@ public class SimHost( } return HostSystemStats( - Duration.ofMillis(_uptime), - Duration.ofMillis(_downtime), - _bootTime, + Duration.ofMillis(localUptime), + Duration.ofMillis(localDowntime), + localBootTime, machine.psu.powerDraw, machine.psu.energyUsage, terminated, running, error, - invalid + invalid, ) } @@ -255,7 +257,7 @@ public class SimHost( hypervisor.cpuCapacity, hypervisor.cpuDemand, hypervisor.cpuUsage, - hypervisor.cpuUsage / _cpuLimit + hypervisor.cpuUsage / localCpuLimit, ) } @@ -275,7 +277,7 @@ public class SimHost( public fun fail() { reset(HostState.ERROR) - for (guest in _guests) { + for (guest in temporaryGuests) { guest.fail() } } @@ -299,31 +301,33 @@ public class SimHost( val bootWorkload = bootModel.get() val hypervisor = hypervisor - val hypervisorWorkload = object : SimWorkload by hypervisor { - override fun onStart(ctx: SimMachineContext) { - try { - _bootTime = clock.instant() - _state = HostState.UP - hypervisor.onStart(ctx) - - // Recover the guests that were running on the hypervisor. - for (guest in _guests) { - guest.recover() + val hypervisorWorkload = + object : SimWorkload by hypervisor { + override fun onStart(ctx: SimMachineContext) { + try { + localBootTime = clock.instant() + localState = HostState.UP + hypervisor.onStart(ctx) + + // Recover the guests that were running on the hypervisor. + for (guest in temporaryGuests) { + guest.recover() + } + } catch (cause: Throwable) { + localState = HostState.ERROR + throw cause } - } catch (cause: Throwable) { - _state = HostState.ERROR - throw cause } } - } val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload // Launch hypervisor onto machine - ctx = machine.startWorkload(workload, emptyMap()) { cause -> - _state = if (cause != null) HostState.ERROR else HostState.DOWN - ctx = null - } + ctx = + machine.startWorkload(workload, emptyMap()) { cause -> + localState = if (cause != null) HostState.ERROR else HostState.DOWN + ctx = null + } } /** @@ -334,7 +338,7 @@ public class SimHost( // Stop the hypervisor ctx?.shutdown() - _state = state + localState = state } /** @@ -352,28 +356,28 @@ public class SimHost( return if (optimize) model.optimize() else model } - private var _lastReport = clock.millis() - private var _uptime = 0L - private var _downtime = 0L - private var _bootTime: Instant? = null - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + private var localLastReport = clock.millis() + private var localUptime = 0L + private var localDowntime = 0L + private var localBootTime: Instant? = null + private val localCpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime of a machine. */ private fun updateUptime() { val now = clock.millis() - val duration = now - _lastReport - _lastReport = now + val duration = now - localLastReport + localLastReport = now - if (_state == HostState.UP) { - _uptime += duration - } else if (_state == HostState.ERROR) { + if (localState == HostState.UP) { + localUptime += duration + } else if (localState == HostState.ERROR) { // Only increment downtime if the machine is in a failure state - _downtime += duration + localDowntime += duration } - val guests = _guests + val guests = temporaryGuests for (i in guests.indices) { guests[i].updateUptime() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt index 5e94830c..9511017f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModel.kt @@ -38,6 +38,6 @@ public interface FailureModel { context: CoroutineContext, clock: InstantSource, service: ComputeService, - random: RandomGenerator + random: RandomGenerator, ): HostFaultInjector } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt index 337f3c60..b8887627 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/FailureModels.kt @@ -46,7 +46,7 @@ public fun grid5000(failureInterval: Duration): FailureModel { context: CoroutineContext, clock: InstantSource, service: ComputeService, - random: RandomGenerator + random: RandomGenerator, ): HostFaultInjector { val rng = Well19937c(random.nextLong()) val hosts = service.hosts.map { it as SimHost }.toSet() @@ -59,7 +59,7 @@ public fun grid5000(failureInterval: Duration): FailureModel { hosts, iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)), ) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt index d34f70d7..faf536ad 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt @@ -32,5 +32,8 @@ public interface HostFault { /** * Apply the fault to the specified [victims]. */ - public suspend fun apply(clock: InstantSource, victims: List<SimHost>) + public suspend fun apply( + clock: InstantSource, + victims: List<SimHost>, + ) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt index afbb99d2..26084a1b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt @@ -60,7 +60,7 @@ public interface HostFaultInjector : AutoCloseable { hosts: Set<SimHost>, iat: RealDistribution, selector: VictimSelector, - fault: HostFault + fault: HostFault, ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault) } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt index 8bd25391..45545f3b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt @@ -32,7 +32,10 @@ import kotlin.math.roundToLong * A type of [HostFault] where the hosts are stopped and recover after some random amount of time. */ public class StartStopHostFault(private val duration: RealDistribution) : HostFault { - override suspend fun apply(clock: InstantSource, victims: List<SimHost>) { + override suspend fun apply( + clock: InstantSource, + victims: List<SimHost>, + ) { for (host in victims) { host.fail() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt index 4aba0e91..93463cdb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -34,9 +34,8 @@ import kotlin.math.roundToInt */ public class StochasticVictimSelector( private val size: RealDistribution, - private val random: RandomGenerator = SplittableRandom(0) + private val random: RandomGenerator = SplittableRandom(0), ) : VictimSelector { - override fun select(hosts: Set<SimHost>): List<SimHost> { val n = size.sample().roundToInt() val result = ArrayList<SimHost>(n) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 354eb3d0..e268c506 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -47,7 +47,7 @@ internal class Guest( private val mapper: SimWorkloadMapper, private val listener: GuestListener, val server: Server, - val machine: SimVirtualMachine + val machine: SimVirtualMachine, ) { /** * The state of the [Guest]. @@ -132,9 +132,9 @@ internal class Guest( updateUptime() return GuestSystemStats( - Duration.ofMillis(_uptime), - Duration.ofMillis(_downtime), - _bootTime + Duration.ofMillis(localUptime), + Duration.ofMillis(localDowntime), + localBootTime, ) } @@ -152,7 +152,7 @@ internal class Guest( counters.cpuLostTime / 1000L, machine.cpuCapacity, machine.cpuUsage, - machine.cpuUsage / _cpuLimit + machine.cpuUsage / localCpuLimit, ) } @@ -173,10 +173,11 @@ internal class Guest( val workload: SimWorkload = mapper.createWorkload(server) workload.setOffset(clock.millis()) val meta = mapOf("driver" to host, "server" to server) + server.meta - ctx = machine.startWorkload(workload, meta) { cause -> - onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED) - ctx = null - } + ctx = + machine.startWorkload(workload, meta) { cause -> + onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED) + ctx = null + } } /** @@ -201,7 +202,7 @@ internal class Guest( * This method is invoked when the guest was started on the host and has booted into a running state. */ private fun onStart() { - _bootTime = clock.instant() + localBootTime = clock.instant() state = ServerState.RUNNING listener.onStart(this) } @@ -216,24 +217,24 @@ internal class Guest( listener.onStop(this) } - private var _uptime = 0L - private var _downtime = 0L - private var _lastReport = clock.millis() - private var _bootTime: Instant? = null - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + private var localUptime = 0L + private var localDowntime = 0L + private var localLastReport = clock.millis() + private var localBootTime: Instant? = null + private val localCpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime and downtime of the guest. */ fun updateUptime() { val now = clock.millis() - val duration = now - _lastReport - _lastReport = now + val duration = now - localLastReport + localLastReport = now if (state == ServerState.RUNNING) { - _uptime += duration + localUptime += duration } else if (state == ServerState.ERROR) { - _downtime += duration + localDowntime += duration } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt index afc0b0d4..c75ce528 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -52,7 +52,7 @@ internal class HostFaultInjectorImpl( private val hosts: Set<SimHost>, private val iat: RealDistribution, private val selector: VictimSelector, - private val fault: HostFault + private val fault: HostFault, ) : HostFaultInjector { /** * The scope in which the injector runs. @@ -72,10 +72,11 @@ internal class HostFaultInjectorImpl( return } - job = scope.launch { - runInjector() - job = null - } + job = + scope.launch { + runInjector() + job = null + } } /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt index 50e7bd0d..753cde16 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -34,10 +34,13 @@ import java.time.Duration public class ComputeMonitorProvisioningStep( private val serviceDomain: String, private val monitor: ComputeMonitor, - private val exportInterval: Duration + private val exportInterval: Duration, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val service = + requireNotNull( + ctx.registry.resolve(serviceDomain, ComputeService::class.java), + ) { "Compute service $serviceDomain does not exist" } val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) return AutoCloseable { metricReader.close() } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt index fc555016..484ae7ca 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeServiceProvisioningStep.kt @@ -36,12 +36,13 @@ import java.time.Duration public class ComputeServiceProvisioningStep internal constructor( private val serviceDomain: String, private val scheduler: (ProvisioningContext) -> ComputeScheduler, - private val schedulingQuantum: Duration + private val schedulingQuantum: Duration, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = ComputeService.builder(ctx.dispatcher, scheduler(ctx)) - .withQuantum(schedulingQuantum) - .build() + val service = + ComputeService.builder(ctx.dispatcher, scheduler(ctx)) + .withQuantum(schedulingQuantum) + .build() ctx.registry.register(serviceDomain, ComputeService::class.java, service) return AutoCloseable { service.close() } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 93f8fa4f..53294b1b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -40,7 +40,7 @@ import java.time.Duration public fun setupComputeService( serviceDomain: String, scheduler: (ProvisioningContext) -> ComputeScheduler, - schedulingQuantum: Duration = Duration.ofMinutes(5) + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ProvisioningStep { return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum) } @@ -56,7 +56,7 @@ public fun setupComputeService( public fun registerComputeMonitor( serviceDomain: String, monitor: ComputeMonitor, - exportInterval: Duration = Duration.ofMinutes(5) + exportInterval: Duration = Duration.ofMinutes(5), ): ProvisioningStep { return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval) } @@ -69,6 +69,10 @@ public fun registerComputeMonitor( * @param specs A list of [HostSpec] objects describing the simulated hosts to provision. * @param optimize A flag to indicate that the CPU resources of the host should be merged into a single CPU resource. */ -public fun setupHosts(serviceDomain: String, specs: List<HostSpec>, optimize: Boolean = false): ProvisioningStep { +public fun setupHosts( + serviceDomain: String, + specs: List<HostSpec>, + optimize: Boolean = false, +): ProvisioningStep { return HostsProvisioningStep(serviceDomain, specs, optimize) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt index 3104ccbe..d9c5e7a6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/HostsProvisioningStep.kt @@ -40,10 +40,13 @@ import java.util.SplittableRandom public class HostsProvisioningStep internal constructor( private val serviceDomain: String, private val specs: List<HostSpec>, - private val optimize: Boolean + private val optimize: Boolean, ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } + val service = + requireNotNull( + ctx.registry.resolve(serviceDomain, ComputeService::class.java), + ) { "Compute service $serviceDomain does not exist" } val engine = FlowEngine.create(ctx.dispatcher) val graph = engine.newGraph() val hosts = mutableSetOf<SimHost>() @@ -52,15 +55,16 @@ public class HostsProvisioningStep internal constructor( val machine = SimBareMetalMachine.create(graph, spec.model, spec.psuFactory) val hypervisor = SimHypervisor.create(spec.multiplexerFactory, SplittableRandom(ctx.seeder.nextLong())) - val host = SimHost( - spec.uid, - spec.name, - spec.meta, - ctx.dispatcher.timeSource, - machine, - hypervisor, - optimize = optimize - ) + val host = + SimHost( + spec.uid, + spec.name, + spec.meta, + ctx.dispatcher.timeSource, + machine, + hypervisor, + optimize = optimize, + ) require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } service.addHost(host) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt index 275378e7..58d3a8c2 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/Provisioner.kt @@ -43,13 +43,14 @@ public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { /** * Implementation of [ProvisioningContext]. */ - private val context = object : ProvisioningContext { - override val dispatcher: Dispatcher = dispatcher - override val seeder: SplittableRandom = SplittableRandom(seed) - override val registry: MutableServiceRegistry = ServiceRegistryImpl() + private val context = + object : ProvisioningContext { + override val dispatcher: Dispatcher = dispatcher + override val seeder: SplittableRandom = SplittableRandom(seed) + override val registry: MutableServiceRegistry = ServiceRegistryImpl() - override fun toString(): String = "Provisioner.ProvisioningContext" - } + override fun toString(): String = "Provisioner.ProvisioningContext" + } /** * The stack of handles to run during the clean-up process. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt index 0226a704..c5b2be72 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ProvisioningStep.kt @@ -56,6 +56,9 @@ public fun interface ProvisioningStep { * @param config The external configuration of the experiment runner. * @return The [ProvisioningStep] constructed according to [spec]. */ - public abstract fun create(spec: S, config: Config): ProvisioningStep + public abstract fun create( + spec: S, + config: Config, + ): ProvisioningStep } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e9bac8db..3a985486 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -62,243 +62,274 @@ internal class SimHostTest { fun setUp() { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - machineModel = MachineModel( - /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) + machineModel = + MachineModel( + // cpus + List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + // memory + List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }, + ) } /** * Test a single virtual machine hosted by the hypervisor. */ @Test - fun testSingle() = runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - val host = SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor - ) - val vmImage = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) - ).createWorkload(1) - ) - ) - - val flavor = MockFlavor(2, 0) + fun testSingle() = + runSimulation { + val duration = 5 * 60L + + val engine = FlowEngine.create(dispatcher) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + + val host = + SimHost( + uid = UUID.randomUUID(), + name = "test", + meta = emptyMap(), + timeSource, + machine, + hypervisor, + ) + val vmImage = + MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2), + ).createWorkload(1), + ), + ) + + val flavor = MockFlavor(2, 0) - suspendCancellableCoroutine { cont -> - host.addListener(object : HostListener { - private var finished = 0 - - override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - if (newState == ServerState.TERMINATED && ++finished == 1) { - cont.resume(Unit) - } - } - }) - val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) - host.spawn(server) - host.start(server) - } + suspendCancellableCoroutine { cont -> + host.addListener( + object : HostListener { + private var finished = 0 + + override fun onStateChanged( + host: Host, + server: Server, + newState: ServerState, + ) { + if (newState == ServerState.TERMINATED && ++finished == 1) { + cont.resume(Unit) + } + } + }, + ) + val server = MockServer(UUID.randomUUID(), "a", flavor, vmImage) + host.spawn(server) + host.start(server) + } - // Ensure last cycle is collected - delay(1000L * duration) - host.close() + // Ensure last cycle is collected + delay(1000L * duration) + host.close() - val cpuStats = host.getCpuStats() + val cpuStats = host.getCpuStats() - assertAll( - { assertEquals(639564, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(2360433, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(56251, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1499999, timeSource.millis()) } - ) - } + assertAll( + { assertEquals(639564, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2360433, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(56251, cpuStats.stealTime, "Steal time does not match") }, + { assertEquals(1499999, timeSource.millis()) }, + ) + } /** * Test overcommitting of resources by the hypervisor. */ @Test - fun testOvercommitted() = runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - - val host = SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor - ) - val vmImageA = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) - ).createWorkload(1) - ) - ) - val vmImageB = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), - SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) - ).createWorkload(1) - ) - ) - - val flavor = MockFlavor(2, 0) - - coroutineScope { - suspendCancellableCoroutine { cont -> - host.addListener(object : HostListener { - private var finished = 0 - - override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - if (newState == ServerState.TERMINATED && ++finished == 2) { - cont.resume(Unit) - } - } - }) - val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) - host.spawn(serverA) - val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) - host.spawn(serverB) - - host.start(serverA) - host.start(serverB) + fun testOvercommitted() = + runSimulation { + val duration = 5 * 60L + + val engine = FlowEngine.create(dispatcher) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + + val host = + SimHost( + uid = UUID.randomUUID(), + name = "test", + meta = emptyMap(), + timeSource, + machine, + hypervisor, + ) + val vmImageA = + MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2), + ).createWorkload(1), + ), + ) + val vmImageB = + MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2), + ).createWorkload(1), + ), + ) + + val flavor = MockFlavor(2, 0) + + coroutineScope { + suspendCancellableCoroutine { cont -> + host.addListener( + object : HostListener { + private var finished = 0 + + override fun onStateChanged( + host: Host, + server: Server, + newState: ServerState, + ) { + if (newState == ServerState.TERMINATED && ++finished == 2) { + cont.resume(Unit) + } + } + }, + ) + val serverA = MockServer(UUID.randomUUID(), "a", flavor, vmImageA) + host.spawn(serverA) + val serverB = MockServer(UUID.randomUUID(), "b", flavor, vmImageB) + host.spawn(serverB) + + host.start(serverA) + host.start(serverB) + } } - } - // Ensure last cycle is collected - delay(1000L * duration) - host.close() + // Ensure last cycle is collected + delay(1000L * duration) + host.close() - val cpuStats = host.getCpuStats() + val cpuStats = host.getCpuStats() - assertAll( - { assertEquals(658502, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(2341496, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(637504, cpuStats.stealTime, "Steal time does not match") }, - { assertEquals(1499999, timeSource.millis()) } - ) - } + assertAll( + { assertEquals(658502, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2341496, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(637504, cpuStats.stealTime, "Steal time does not match") }, + { assertEquals(1499999, timeSource.millis()) }, + ) + } /** * Test failure of the host. */ @Test - fun testFailure() = runSimulation { - val duration = 5 * 60L - - val engine = FlowEngine.create(dispatcher) - val graph = engine.newGraph() - - val machine = SimBareMetalMachine.create(graph, machineModel) - val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) - val host = SimHost( - uid = UUID.randomUUID(), - name = "test", - meta = emptyMap(), - timeSource, - machine, - hypervisor - ) - val image = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to - SimTrace.ofFragments( - SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), - SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), - SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) - ).createWorkload(1) - ) - ) - val flavor = MockFlavor(2, 0) - val server = MockServer(UUID.randomUUID(), "a", flavor, image) - - coroutineScope { - host.spawn(server) - host.start(server) - delay(5000L) - host.fail() - delay(duration * 1000) - host.recover() - - suspendCancellableCoroutine { cont -> - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - if (newState == ServerState.TERMINATED) { - cont.resume(Unit) - } - } - }) + fun testFailure() = + runSimulation { + val duration = 5 * 60L + + val engine = FlowEngine.create(dispatcher) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + val host = + SimHost( + uid = UUID.randomUUID(), + name = "test", + meta = emptyMap(), + timeSource, + machine, + hypervisor, + ) + val image = + MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2), + ).createWorkload(1), + ), + ) + val flavor = MockFlavor(2, 0) + val server = MockServer(UUID.randomUUID(), "a", flavor, image) + + coroutineScope { + host.spawn(server) + host.start(server) + delay(5000L) + host.fail() + delay(duration * 1000) + host.recover() + + suspendCancellableCoroutine { cont -> + host.addListener( + object : HostListener { + override fun onStateChanged( + host: Host, + server: Server, + newState: ServerState, + ) { + if (newState == ServerState.TERMINATED) { + cont.resume(Unit) + } + } + }, + ) + } } - } - host.close() - // Ensure last cycle is collected - delay(1000L * duration) - - val cpuStats = host.getCpuStats() - val sysStats = host.getSystemStats() - val guestSysStats = host.getSystemStats(server) - - assertAll( - { assertEquals(1770344, cpuStats.idleTime, "Idle time does not match") }, - { assertEquals(639653, cpuStats.activeTime, "Active time does not match") }, - { assertEquals(1204999, sysStats.uptime.toMillis(), "Uptime does not match") }, - { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, - { assertEquals(1204999, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, - { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") } - ) - } + host.close() + // Ensure last cycle is collected + delay(1000L * duration) + + val cpuStats = host.getCpuStats() + val sysStats = host.getSystemStats() + val guestSysStats = host.getSystemStats(server) + + assertAll( + { assertEquals(1770344, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(639653, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(1204999, sysStats.uptime.toMillis(), "Uptime does not match") }, + { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, + { assertEquals(1204999, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, + { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") }, + ) + } private class MockFlavor( override val cpuCount: Int, - override val memorySize: Long + override val memorySize: Long, ) : Flavor { override val uid: UUID = UUID.randomUUID() override val name: String = "test" @@ -318,7 +349,7 @@ internal class SimHostTest { override val uid: UUID, override val name: String, override val labels: Map<String, String>, - override val meta: Map<String, Any> + override val meta: Map<String, Any>, ) : Image { override fun delete() { throw NotImplementedError() @@ -333,7 +364,7 @@ internal class SimHostTest { override val uid: UUID, override val name: String, override val flavor: Flavor, - override val image: Image + override val image: Image, ) : Server { override val labels: Map<String, String> = emptyMap() diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt index 29d0b5e7..690bf472 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt @@ -43,64 +43,72 @@ class HostFaultInjectorTest { * Simple test case to test that nothing happens when the injector is not started. */ @Test - fun testInjectorNotStarted() = runSimulation { - val host = mockk<SimHost>(relaxUnitFun = true) + fun testInjectorNotStarted() = + runSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) - val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) + val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) - coVerify(exactly = 0) { host.fail() } - coVerify(exactly = 0) { host.recover() } + coVerify(exactly = 0) { host.fail() } + coVerify(exactly = 0) { host.recover() } - injector.close() - } + injector.close() + } /** * Simple test case to test a start stop fault where the machine is stopped and started after some time. */ @Test - fun testInjectorStopsMachine() = runSimulation { - val host = mockk<SimHost>(relaxUnitFun = true) + fun testInjectorStopsMachine() = + runSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) - val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) + val injector = createSimpleInjector(coroutineContext, timeSource, setOf(host)) - injector.start() + injector.start() - delay(Duration.ofDays(55).toMillis()) + delay(Duration.ofDays(55).toMillis()) - injector.close() + injector.close() - coVerify(exactly = 1) { host.fail() } - coVerify(exactly = 1) { host.recover() } - } + coVerify(exactly = 1) { host.fail() } + coVerify(exactly = 1) { host.recover() } + } /** * Simple test case to test a start stop fault where multiple machines are stopped. */ @Test - fun testInjectorStopsMultipleMachines() = runSimulation { - val hosts = listOf<SimHost>( - mockk(relaxUnitFun = true), - mockk(relaxUnitFun = true) - ) + fun testInjectorStopsMultipleMachines() = + runSimulation { + val hosts = + listOf<SimHost>( + mockk(relaxUnitFun = true), + mockk(relaxUnitFun = true), + ) - val injector = createSimpleInjector(coroutineContext, timeSource, hosts.toSet()) + val injector = createSimpleInjector(coroutineContext, timeSource, hosts.toSet()) - injector.start() + injector.start() - delay(Duration.ofDays(55).toMillis()) + delay(Duration.ofDays(55).toMillis()) - injector.close() + injector.close() - coVerify(exactly = 1) { hosts[0].fail() } - coVerify(exactly = 1) { hosts[1].fail() } - coVerify(exactly = 1) { hosts[0].recover() } - coVerify(exactly = 1) { hosts[1].recover() } - } + coVerify(exactly = 1) { hosts[0].fail() } + coVerify(exactly = 1) { hosts[1].fail() } + coVerify(exactly = 1) { hosts[0].recover() } + coVerify(exactly = 1) { hosts[1].recover() } + } /** * Create a simple start stop fault injector. */ - private fun createSimpleInjector(context: CoroutineContext, clock: InstantSource, hosts: Set<SimHost>): HostFaultInjector { + private fun createSimpleInjector( + context: CoroutineContext, + clock: InstantSource, + hosts: Set<SimHost>, + ): HostFaultInjector { val rng = Well19937c(0) val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03) val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25)) diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts index c403ccb9..f7af3877 100644 --- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts +++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts @@ -22,7 +22,7 @@ description = "OpenDC Compute Service implementation" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt index db875449..830101ef 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -53,7 +53,7 @@ public class ComputeMetricReader( dispatcher: Dispatcher, private val service: ComputeService, private val monitor: ComputeMonitor, - private val exportInterval: Duration = Duration.ofMinutes(5) + private val exportInterval: Duration = Duration.ofMinutes(5), ) : AutoCloseable { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) @@ -77,22 +77,23 @@ public class ComputeMetricReader( /** * The background job that is responsible for collecting the metrics every cycle. */ - private val job = scope.launch { - val intervalMs = exportInterval.toMillis() - try { - while (isActive) { - delay(intervalMs) - + private val job = + scope.launch { + val intervalMs = exportInterval.toMillis() + try { + while (isActive) { + delay(intervalMs) + + loggState() + } + } finally { loggState() - } - } finally { - loggState() - if (monitor is AutoCloseable) { - monitor.close() + if (monitor is AutoCloseable) { + monitor.close() + } } } - } private fun loggState() { try { @@ -127,7 +128,6 @@ public class ComputeMetricReader( * An aggregator for service metrics before they are reported. */ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { - override fun copy(): ServiceTableReader { val newServiceTable = ServiceTableReaderImpl(service) newServiceTable.setValues(this) @@ -402,16 +402,17 @@ public class ComputeMetricReader( /** * The static information about this server. */ - override val server = ServerInfo( - server.uid.toString(), - server.name, - "vm", - "x86", - server.image.uid.toString(), - server.image.name, - server.flavor.cpuCount, - server.flavor.memorySize - ) + override val server = + ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize, + ) /** * The [HostInfo] of the host on which the server is hosted. diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt index f60fbe6d..1c910497 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -32,20 +32,23 @@ import java.io.File * A [ComputeMonitor] that logs the events to a Parquet file. */ public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = ParquetServerDataWriter( - File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) + private val serverWriter = + ParquetServerDataWriter( + File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, + bufferSize, + ) - private val hostWriter = ParquetHostDataWriter( - File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) + private val hostWriter = + ParquetHostDataWriter( + File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, + bufferSize, + ) - private val serviceWriter = ParquetServiceDataWriter( - File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) + private val serviceWriter = + ParquetServiceDataWriter( + File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, + bufferSize, + ) override fun record(reader: ServerTableReader) { serverWriter.write(reader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt index 34a75d75..b96ee28b 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt @@ -43,7 +43,7 @@ import kotlin.concurrent.thread public abstract class ParquetDataWriter<in T>( path: File, private val writeSupport: WriteSupport<T>, - bufferSize: Int = 4096 + bufferSize: Int = 4096, ) : AutoCloseable { /** * The logging instance to use. @@ -63,41 +63,44 @@ public abstract class ParquetDataWriter<in T>( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { - val writer = let { - val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - buildWriter(builder) - } + private val writerThread = + thread(start = false, name = this.toString()) { + val writer = + let { + val builder = + LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } - val queue = queue - val buf = mutableListOf<T>() - var shouldStop = false + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false - try { - while (!shouldStop) { - try { - writer.write(queue.take()) - } catch (e: InterruptedException) { - shouldStop = true - } + try { + while (!shouldStop) { + try { + writer.write(queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } - if (queue.drainTo(buf) > 0) { - for (data in buf) { - writer.write(data) + if (queue.drainTo(buf) > 0) { + for (data in buf) { + writer.write(data) + } + buf.clear() } - buf.clear() } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() } - } catch (e: Throwable) { - logger.error(e) { "Failure in Parquet data writer" } - exception = e - } finally { - writer.close() } - } /** * Build the [ParquetWriter] used to write the Parquet files. diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt index a6799ef8..b789e44f 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt @@ -40,7 +40,6 @@ import java.io.File */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { return builder .withDictionaryEncoding("host_id", true) @@ -67,7 +66,10 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, data: HostTableReader) { + private fun write( + consumer: RecordConsumer, + data: HostTableReader, + ) { consumer.startMessage() consumer.startField("timestamp", 0) @@ -165,76 +167,77 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : /** * The schema of the host data. */ - val SCHEMA: MessageType = Types - .buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) + val SCHEMA: MessageType = + Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_demand"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_utilization"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("power_draw"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("energy_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_demand"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_utilization"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_draw"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("energy_usage"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time") - ) - .named("host") + .named("boot_time"), + ) + .named("host") } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt index e8a28016..bcae6805 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt @@ -40,7 +40,6 @@ import java.io.File */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { return builder .withDictionaryEncoding("server_id", true) @@ -68,7 +67,10 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, data: ServerTableReader) { + private fun write( + consumer: RecordConsumer, + data: ServerTableReader, + ) { consumer.startMessage() consumer.startField("timestamp", 0) @@ -148,61 +150,61 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : /** * The schema of the server data. */ - val SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) + val SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time") - - ) - .named("server") + .named("boot_time"), + ) + .named("server") } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt index a487203e..21247ef3 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt @@ -36,7 +36,6 @@ import java.io.File */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { - override fun toString(): String = "service-writer" /** @@ -57,7 +56,10 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + private fun write( + consumer: RecordConsumer, + data: ServiceTableReader, + ) { consumer.startMessage() consumer.startField("timestamp", 0) @@ -97,34 +99,35 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : } private companion object { - private val SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) + private val SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) // .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_up"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_down"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_pending"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_success"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_failure"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_error") - ) - .named("service") + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error"), + ) + .named("service") } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index bfe2f281..f9fff3e5 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -28,7 +28,6 @@ import java.time.Instant * An interface that is used to read a row of a host trace entry. */ public interface HostTableReader { - public fun copy(): HostTableReader public fun setValues(table: HostTableReader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt index 96c5bb13..fb83bf06 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt @@ -33,5 +33,5 @@ public data class ServerInfo( val imageId: String, val imageName: String, val cpuCount: Int, - val memCapacity: Long + val memCapacity: Long, ) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt index ec9743d8..0ebf9d2f 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -28,7 +28,6 @@ import java.time.Instant * An interface that is used to read a row of a server trace entry. */ public interface ServerTableReader { - public fun copy(): ServerTableReader public fun setValues(table: ServerTableReader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt index 0d8b2abd..ad4b3d49 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt @@ -36,7 +36,7 @@ public data class ServiceData( val serversActive: Int, val attemptsSuccess: Int, val attemptsFailure: Int, - val attemptsError: Int + val attemptsError: Int, ) /** @@ -52,6 +52,6 @@ public fun ServiceTableReader.toServiceData(): ServiceData { serversActive, attemptsSuccess, attemptsFailure, - attemptsError + attemptsError, ) } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index 501e317c..10757a27 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -28,7 +28,6 @@ import java.time.Instant * An interface that is used to read a row of a service trace entry. */ public interface ServiceTableReader { - public fun copy(): ServiceTableReader public fun setValues(table: ServiceTableReader) diff --git a/opendc-compute/opendc-compute-topology/build.gradle.kts b/opendc-compute/opendc-compute-topology/build.gradle.kts index d4c084c0..0dedf8a9 100644 --- a/opendc-compute/opendc-compute-topology/build.gradle.kts +++ b/opendc-compute/opendc-compute-topology/build.gradle.kts @@ -22,7 +22,7 @@ description = "OpenDC Compute Topology implementation" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt index e36c4e1e..7a8a121c 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpec.kt @@ -42,5 +42,5 @@ public data class ClusterSpec( val memCapacity: Double, val hostCount: Int, val memCapacityPerHost: Double, - val cpuCountPerHost: Int + val cpuCountPerHost: Int, ) diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt index a1e9bc3d..13314f7d 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/ClusterSpecReader.kt @@ -65,16 +65,17 @@ public class ClusterSpecReader { val result = mutableListOf<ClusterSpec>() for (entry in it) { - val def = ClusterSpec( - entry.id, - entry.name, - entry.cpuCount, - entry.cpuSpeed * 1000, // Convert to MHz - entry.memCapacity * 1000, // Convert to MiB - entry.hostCount, - entry.memCapacityPerHost * 1000, - entry.cpuCountPerHost - ) + val def = + ClusterSpec( + entry.id, + entry.name, + entry.cpuCount, + entry.cpuSpeed * 1000, + entry.memCapacity * 1000, + entry.hostCount, + entry.memCapacityPerHost * 1000, + entry.cpuCountPerHost, + ) result.add(def) } @@ -97,25 +98,26 @@ public class ClusterSpecReader { @JsonProperty("memoryCapacityPerHost") val memCapacityPerHost: Double, @JsonProperty("coreCountPerHost") - val cpuCountPerHost: Int + val cpuCountPerHost: Int, ) public companion object { /** * The [CsvSchema] that is used to parse the trace. */ - private val schema = CsvSchema.builder() - .addColumn("ClusterID", CsvSchema.ColumnType.STRING) - .addColumn("ClusterName", CsvSchema.ColumnType.STRING) - .addColumn("Cores", CsvSchema.ColumnType.NUMBER) - .addColumn("Speed", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory", CsvSchema.ColumnType.NUMBER) - .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER) - .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER) - .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setColumnSeparator(';') - .setUseHeader(true) - .build() + private val schema = + CsvSchema.builder() + .addColumn("ClusterID", CsvSchema.ColumnType.STRING) + .addColumn("ClusterName", CsvSchema.ColumnType.STRING) + .addColumn("Cores", CsvSchema.ColumnType.NUMBER) + .addColumn("Speed", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory", CsvSchema.ColumnType.NUMBER) + .addColumn("numberOfHosts", CsvSchema.ColumnType.NUMBER) + .addColumn("memoryCapacityPerHost", CsvSchema.ColumnType.NUMBER) + .addColumn("coreCountPerHost", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setColumnSeparator(';') + .setUseHeader(true) + .build() } } diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt index 596121b0..ffaa093e 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/HostSpec.kt @@ -44,5 +44,5 @@ public data class HostSpec( val meta: Map<String, Any>, val model: MachineModel, val psuFactory: SimPsuFactory = SimPsuFactories.noop(), - val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer(), ) diff --git a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt index 5f0fe511..aadf52a6 100644 --- a/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt +++ b/opendc-compute/opendc-compute-topology/src/main/kotlin/org/opendc/compute/topology/TopologyFactories.kt @@ -49,7 +49,7 @@ private val reader = ClusterSpecReader() public fun clusterTopology( file: File, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: RandomGenerator = SplittableRandom(0) + random: RandomGenerator = SplittableRandom(0), ): List<HostSpec> { return clusterTopology(reader.read(file), powerModel, random) } @@ -60,7 +60,7 @@ public fun clusterTopology( public fun clusterTopology( input: InputStream, powerModel: CpuPowerModel = CpuPowerModels.linear(350.0, 200.0), - random: RandomGenerator = SplittableRandom(0) + random: RandomGenerator = SplittableRandom(0), ): List<HostSpec> { return clusterTopology(reader.read(input), powerModel, random) } @@ -68,23 +68,31 @@ public fun clusterTopology( /** * Construct a topology from the given list of [clusters]. */ -public fun clusterTopology(clusters: List<ClusterSpec>, powerModel: CpuPowerModel, random: RandomGenerator = SplittableRandom(0)): List<HostSpec> { +public fun clusterTopology( + clusters: List<ClusterSpec>, + powerModel: CpuPowerModel, + random: RandomGenerator = SplittableRandom(0), +): List<HostSpec> { return clusters.flatMap { it.toHostSpecs(random, powerModel) } } /** * Helper method to convert a [ClusterSpec] into a list of [HostSpec]s. */ -private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowerModel): List<HostSpec> { +private fun ClusterSpec.toHostSpecs( + random: RandomGenerator, + powerModel: CpuPowerModel, +): List<HostSpec> { val cpuSpeed = cpuSpeed val memoryPerHost = memCapacityPerHost.roundToLong() val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", cpuCountPerHost) val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) - val machineModel = MachineModel( - List(cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, - listOf(unknownMemoryUnit) - ) + val machineModel = + MachineModel( + List(cpuCountPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, cpuSpeed) }, + listOf(unknownMemoryUnit), + ) return List(hostCount) { HostSpec( @@ -92,7 +100,7 @@ private fun ClusterSpec.toHostSpecs(random: RandomGenerator, powerModel: CpuPowe "node-$name-$it", mapOf("cluster" to id), machineModel, - SimPsuFactories.simple(powerModel) + SimPsuFactories.simple(powerModel), ) } } diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 905f905c..58b7bc86 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -22,7 +22,7 @@ description = "OpenDC Compute Service implementation" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index a802afdb..c9f784ff 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -31,5 +31,8 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> + public fun resolve( + loader: ComputeWorkloadLoader, + random: RandomGenerator, + ): List<VirtualMachine> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index c5fb3e56..2202f851 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -29,18 +29,18 @@ import org.opendc.trace.Trace import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import java.io.File import java.lang.ref.SoftReference import java.time.Duration @@ -71,11 +71,11 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseFragments(trace: Trace): Map<String, Builder> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val durationCol = reader.resolve(RESOURCE_STATE_DURATION) - val coresCol = reader.resolve(RESOURCE_CPU_COUNT) - val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + val idCol = reader.resolve(resourceID) + val timestampCol = reader.resolve(resourceStateTimestamp) + val durationCol = reader.resolve(resourceStateDuration) + val coresCol = reader.resolve(resourceCpuCount) + val usageCol = reader.resolve(resourceStateCpuUsage) val fragments = mutableMapOf<String, Builder>() @@ -100,15 +100,19 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> { + private fun parseMeta( + trace: Trace, + fragments: Map<String, Builder>, + interferenceModel: VmInterferenceModel, + ): List<VirtualMachine> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - val idCol = reader.resolve(RESOURCE_ID) - val startTimeCol = reader.resolve(RESOURCE_START_TIME) - val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) - val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + val idCol = reader.resolve(resourceID) + val startTimeCol = reader.resolve(resourceStartTime) + val stopTimeCol = reader.resolve(resourceStopTime) + val cpuCountCol = reader.resolve(resourceCpuCount) + val cpuCapacityCol = reader.resolve(resourceCpuCapacity) + val memCol = reader.resolve(resourceMemCapacity) var counter = 0 val entries = mutableListOf<VirtualMachine>() @@ -141,8 +145,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { submissionTime, endTime, builder.build(), - interferenceModel.getProfile(id) - ) + interferenceModel.getProfile(id), + ), ) } @@ -189,24 +193,28 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): List<VirtualMachine> { - val ref = cache.compute(name) { key, oldVal -> - val inst = oldVal?.get() - if (inst == null) { - val path = baseDir.resolve(key) - - logger.info { "Loading trace $key at $path" } - - val trace = Trace.open(path, format) - val fragments = parseFragments(trace) - val interferenceModel = parseInterferenceModel(trace) - val vms = parseMeta(trace, fragments, interferenceModel) - - SoftReference(vms) - } else { - oldVal + public fun get( + name: String, + format: String, + ): List<VirtualMachine> { + val ref = + cache.compute(name) { key, oldVal -> + val inst = oldVal?.get() + if (inst == null) { + val path = baseDir.resolve(key) + + logger.info { "Loading trace $key at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + val interferenceModel = parseInterferenceModel(trace) + val vms = parseMeta(trace, fragments, interferenceModel) + + SoftReference(vms) + } else { + oldVal + } } - } return checkNotNull(ref?.get()) { "Memory pressure" } } @@ -245,7 +253,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ - fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) { + fun add( + deadline: Instant, + duration: Duration, + usage: Double, + cores: Int, + ) { val startTimeMs = (deadline - duration).toEpochMilli() totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt index 61a6e3a0..8723f88b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -32,7 +32,10 @@ import org.opendc.compute.workload.internal.TraceComputeWorkload /** * Construct a workload from a trace. */ -public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) +public fun trace( + name: String, + format: String = "opendc-vm", +): ComputeWorkload = TraceComputeWorkload(name, format) /** * Construct a composite workload with the specified fractions. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 622b3c55..deb50f5c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -50,5 +50,5 @@ public data class VirtualMachine( val startTime: Instant, val stopTime: Instant, val trace: SimTrace, - val interferenceProfile: VmInterferenceProfile? + val interferenceProfile: VmInterferenceProfile?, ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 1ac5f4ad..aba493b6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,7 +37,10 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { + override fun resolve( + loader: ComputeWorkloadLoader, + random: RandomGenerator, + ): List<VirtualMachine> { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index fdb599c1..4207b2be 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -35,7 +35,11 @@ import java.util.random.RandomGenerator * @param fraction The fraction of load/virtual machines to sample * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. */ -internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { +internal class HpcSampledComputeWorkload( + val source: ComputeWorkload, + val fraction: Double, + val sampleLoad: Boolean = false, +) : ComputeWorkload { /** * The logging instance of this class. */ @@ -46,29 +50,35 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { + override fun resolve( + loader: ComputeWorkloadLoader, + random: RandomGenerator, + ): List<VirtualMachine> { val vms = source.resolve(loader, random) - val (hpc, nonHpc) = vms.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<VirtualMachine>() - hpc.mapTo(res) { sample(it, index) } - res + val (hpc, nonHpc) = + vms.partition { entry -> + val name = entry.name + name.matches(pattern) } - .flatten() - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<VirtualMachine>() - nonHpc.mapTo(res) { sample(it, index) } - res - } - .flatten() + val hpcSequence = + generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res + } + .flatten() + + val nonHpcSequence = + generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res + } + .flatten() logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } @@ -135,7 +145,10 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti /** * Sample a random trace entry. */ - private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + private fun sample( + entry: VirtualMachine, + i: Int, + ): VirtualMachine { val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) return entry.copy(uid = uid) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index 6014f37a..c89507fa 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,7 +37,10 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { + override fun resolve( + loader: ComputeWorkloadLoader, + random: RandomGenerator, + ): List<VirtualMachine> { val vms = source.resolve(loader, random) val res = mutableListOf<VirtualMachine>() diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index ff88fa3e..39255c59 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -31,7 +31,10 @@ import java.util.random.RandomGenerator * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: RandomGenerator): List<VirtualMachine> { + override fun resolve( + loader: ComputeWorkloadLoader, + random: RandomGenerator, + ): List<VirtualMachine> { return loader.get(name, format) } } |
