diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
| commit | aa9b32f8cd1467e9718959f400f6777e5d71737d (patch) | |
| tree | b88bbede15108c6855d7f94ded4c7054df186a72 /opendc-compute | |
| parent | eb0e0a3bc557c05a70eead388797ab850ea87366 (diff) | |
| parent | b7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff) | |
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of
OpenDC into the public Github repository in order to sync the progress
of both repositories.
Diffstat (limited to 'opendc-compute')
53 files changed, 3887 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index baa1ba2f..577fbc73 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -45,7 +45,7 @@ public interface ComputeClient : AutoCloseable { * * @param name The name of the flavor. * @param cpuCount The amount of CPU cores for this flavor. - * @param memorySize The size of the memory. + * @param memorySize The size of the memory in MB. * @param labels The identifying labels of the image. * @param meta The non-identifying meta-data of the image. */ diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index e0e48b0f..33cafc45 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index 5632a55e..fc092a3f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,7 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * - * @property vcpuCount The number of logical processing cores available for this host. + * @property cpuCount The number of logical processing cores available for this host. * @property memorySize The amount of memory available for this host in MB. */ public data class HostModel(public val cpuCount: Int, public val memorySize: Long) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 8af5f86e..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,7 +22,10 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -33,6 +36,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -40,15 +44,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -61,6 +68,11 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + + /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -104,60 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. - */ - private val _submittedServers = meter.longCounterBuilder("servers.submitted") - .setDescription("Number of start requests") - .setUnit("1") - .build() - - /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. + * The number of scheduling attempts. */ - private val _runningServers = meter.longUpDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of servers that have finished running. + * The response time of the service. */ - private val _finishedServers = meter.longCounterBuilder("servers.finished") - .setDescription("Number of servers that finished running") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of hosts registered at the compute service. + * The number of servers that are pending. */ - private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") - .build() - - /** - * The number of available hosts registered at the compute service. - */ - private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -170,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -297,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -325,10 +325,9 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - val request = SchedulingRequest(server) + val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -354,10 +353,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -368,12 +369,13 @@ internal class ComputeServiceImpl( * Run a single scheduling iteration. */ private fun doSchedule() { + val now = clock.millis() while (queue.isNotEmpty()) { val request = queue.peek() if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -385,12 +387,12 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } - server.state = ServerState.ERROR + server.state = ServerState.TERMINATED continue } else { break @@ -401,7 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -416,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -430,7 +438,7 @@ internal class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - internal data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) { /** * A flag to indicate that the request is cancelled. */ @@ -440,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -475,14 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -50,6 +53,21 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + + /** * The [Host] that has been assigned to host the server. */ internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 0fd5b2a4..8c2d4715 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -26,6 +26,8 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView import org.opendc.compute.service.scheduler.filters.HostFilter import org.opendc.compute.service.scheduler.weights.HostWeigher +import java.util.* +import kotlin.math.min /** * A [ComputeScheduler] implementation that uses filtering and weighing passes to select @@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher * * This implementation is based on the filter scheduler from OpenStack Nova. * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html + * + * @param filters The list of filters to apply when searching for an appropriate host. + * @param weighers The list of weighers to apply when searching for an appropriate host. + * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen. + * @param random A [Random] instance for selecting */ -public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler { +public class FilterScheduler( + private val filters: List<HostFilter>, + private val weighers: List<HostWeigher>, + private val subsetSize: Int = 1, + private val random: Random = Random(0) +) : ComputeScheduler { /** * The pool of hosts available to the scheduler. */ private val hosts = mutableListOf<HostView>() + init { + require(subsetSize >= 1) { "Subset size must be one or greater" } + } + override fun addHost(host: HostView) { hosts.add(host) } @@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val } override fun select(server: Server): HostView? { - return hosts.asSequence() - .filter { host -> - for (filter in filters) { - if (!filter.test(host, server)) - return@filter false + val hosts = hosts + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } } + + val subset = if (weighers.isNotEmpty()) { + val results = weighers.map { it.getWeights(filteredHosts, server) } + val weights = DoubleArray(filteredHosts.size) + + for (result in results) { + val min = result.min + val range = (result.max - min) + + // Skip result if all weights are the same + if (range == 0.0) { + continue } - true - } - .sortedByDescending { host -> - weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor } + val multiplier = result.multiplier + val factor = multiplier / range + + for ((i, weight) in result.weights.withIndex()) { + weights[i] += factor * (weight - min) + } } - .firstOrNull() + + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } + + return when (val maxSize = min(subsetSize, subset.size)) { + 0 -> null + 1 -> subset[0] + else -> subset[random.nextInt(maxSize)] + } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt new file mode 100644 index 00000000..a470a453 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host. + * + * @param allocationRatio Virtual RAM to physical RAM allocation ratio. + */ +public class RamFilter(private val allocationRatio: Double) : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requested = server.flavor.memorySize + val available = host.availableMemory + val total = host.host.model.memorySize + + // Do not allow an instance to overcommit against itself, only against + // other instances. + if (requested > total) { + return false + } + + val limit = total * allocationRatio + val used = total - available + val usable = limit - used + return usable >= requested + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt index 072440c5..abdd79f1 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt @@ -26,15 +26,22 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server - * flavor. + * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ -public class ComputeCapabilitiesFilter : HostFilter { +public class VCpuFilter(private val allocationRatio: Double) : HostFilter { override fun test(host: HostView, server: Server): Boolean { - val fitsMemory = host.availableMemory >= server.flavor.memorySize - val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount - return fitsMemory && fitsCpu - } + val requested = server.flavor.cpuCount + val total = host.host.model.cpuCount + val limit = total * allocationRatio - override fun toString(): String = "ComputeCapabilitiesFilter" + // Do not allow an instance to overcommit against itself, only against other instances + if (requested > total) { + return false + } + + val free = limit - host.provisionedCores + return free >= requested + } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt index 12e6510e..d668fdaf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt @@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView /** * A [HostWeigher] that weighs the hosts based on the available memory per core on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available core memory, and a negative number will result in the scheduler preferring hosts with less available core + * memory. */ -public class CoreMemoryWeigher : HostWeigher { +public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.availableMemory.toDouble() / host.host.model.cpuCount } - override fun toString(): String = "CoreMemoryWeigher" + override fun toString(): String = "CoreRamWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt index d48ee9e0..aca8c4e6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt @@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler /** * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request. */ -public fun interface HostWeigher { +public interface HostWeigher { + /** + * The multiplier for the weigher. + */ + public val multiplier: Double + /** * Obtain the weight of the specified [host] when scheduling the specified [server]. */ public fun getWeight(host: HostView, server: Server): Double + + /** + * Obtain the weights for [hosts] when scheduling the specified [server]. + */ + public fun getWeights(hosts: List<HostView>, server: Server): Result { + val weights = DoubleArray(hosts.size) + var min = Double.MAX_VALUE + var max = Double.MIN_VALUE + + for ((i, host) in hosts.withIndex()) { + val weight = getWeight(host, server) + weights[i] = weight + min = kotlin.math.min(min, weight) + max = kotlin.math.max(max, weight) + } + + return Result(weights, min, max, multiplier) + } + + /** + * A result returned by the weigher. + * + * @param weights The weights returned by the weigher. + * @param min The minimum weight returned. + * @param max The maximum weight returned. + * @param multiplier The weight multiplier to use. + */ + public class Result( + public val weights: DoubleArray, + public val min: Double, + public val max: Double, + public val multiplier: Double, + ) } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt index 2ef733e5..732cbe03 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt @@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView /** * A [HostWeigher] that weighs the hosts based on the number of instances on the host. */ -public class InstanceCountWeigher : HostWeigher { +public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.instanceCount.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt index 115d8e4d..d18d31f4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt @@ -26,12 +26,15 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostWeigher] that weighs the hosts based on the available memory on the host. + * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available memory, and a negative number will result in the scheduler preferring hosts with less memory. */ -public class MemoryWeigher : HostWeigher { +public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.availableMemory.toDouble() } - override fun toString(): String = "MemoryWeigher" + override fun toString(): String = "RamWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt index df5bcd6e..4a22269b 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt @@ -26,12 +26,19 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host. + * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ -public class ProvisionedCoresWeigher : HostWeigher { +public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher { + + init { + require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" } + } + override fun getWeight(host: HostView, server: Server): Double { - return host.provisionedCores.toDouble() + return host.host.model.cpuCount * allocationRatio - host.provisionedCores } - override fun toString(): String = "ProvisionedCoresWeigher" + override fun toString(): String = "VCpuWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index a6258845..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.MemoryWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import java.util.* @@ -57,11 +58,10 @@ internal class ComputeServiceTest { scope = SimulationCoroutineScope() val clock = scope.clock val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), + weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test @@ -167,9 +167,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -180,9 +180,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -193,9 +193,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -207,7 +207,7 @@ internal class ComputeServiceTest { server.start() server.stop() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.TERMINATED, server.state) } @@ -228,7 +228,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(10 * 60 * 1000) + delay(10L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -254,12 +254,12 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.UP listeners.forEach { it.onStateChanged(host, HostState.UP) } - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -284,13 +284,13 @@ internal class ComputeServiceTest { val image = client.newImage("test") val server = client.newServer("test", image, flavor, start = false) - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.DOWN listeners.forEach { it.onStateChanged(host, HostState.DOWN) } server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -344,7 +344,7 @@ internal class ComputeServiceTest { // Start server server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) coVerify { host.spawn(capture(slot), true) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -383,7 +383,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 20ea8d20..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,11 +99,11 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } server.start() @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,10 +158,10 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -220,10 +221,10 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk<InternalFlavor>() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk<InternalImage>() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt new file mode 100644 index 00000000..cafd4498 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.HostModel +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.InstanceCountFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Test suite for the [FilterScheduler]. + */ +internal class FilterSchedulerTest { + @Test + fun testInvalidSubsetSize() { + assertThrows<IllegalArgumentException> { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = 0 + ) + } + + assertThrows<IllegalArgumentException> { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = -2 + ) + } + } + + @Test + fun testNoHosts() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testNoFiltersAndSchedulers() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostA, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testNoFiltersAndSchedulersRandom() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(1) + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostB, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testHostIsDown() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.DOWN + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testHostIsUp() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(host, scheduler.select(server)) + } + + @Test + fun testRamFilter() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 512 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 2048 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.5)), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.availableMemory } returns 2048 + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 2300 + + assertNull(scheduler.select(server)) + } + + @Test + fun testVCpuFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 3 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(16.0)), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.provisionedCores } returns 0 + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 8 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testInstanceCountFilter() { + val scheduler = FilterScheduler( + filters = listOf(InstanceCountFilter(limit = 2)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(RamWeigher(1.5)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostA, scheduler.select(server)) + } + + @Test + fun testCoreRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(CoreRamWeigher(1.5)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(VCpuWeigher(16.0)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testInstanceCountWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } +} diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index b31a2114..aaf69f78 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -33,11 +33,13 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) - api(projects.opendcSimulator.opendcSimulatorFailures) + api(libs.commons.math3) implementation(projects.opendcUtils) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 68667a8c..b9d02185 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,29 +22,34 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.ScalingDriver -import org.opendc.simulator.compute.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.kernel.SimHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.PowerModel -import org.opendc.simulator.failures.FailureDomain -import java.time.Clock +import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -52,34 +57,27 @@ import kotlin.coroutines.resume public class SimHost( override val uid: UUID, override val name: String, - model: SimMachineModel, + model: MachineModel, override val meta: Map<String, Any>, context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver, + engine: FlowEngine, + meterProvider: MeterProvider, + hypervisorProvider: SimHypervisorProvider, + scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), + powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), -) : Host, FailureDomain, AutoCloseable { - - public constructor( - uid: UUID, - name: String, - model: SimMachineModel, - meta: Map<String, Any>, - context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - powerModel: PowerModel = ConstantPowerModel(0.0), - mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper) - + interferenceDomain: VmInterferenceDomain? = null, + private val optimize: Boolean = false +) : Host, AutoCloseable { /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context + Job()) + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The clock instance used by the host. + */ + private val clock = engine.clock /** * The logger instance of this server. @@ -87,52 +85,31 @@ public class SimHost( private val logger = KotlinLogging.logger {} /** - * The event listeners registered with this host. + * The [Meter] to track metrics of the simulated host. */ - private val listeners = mutableListOf<HostListener>() + private val meter = meterProvider.get("org.opendc.compute.simulator") /** - * Current total memory use of the images on this hypervisor. + * The event listeners registered with this host. */ - private var availableMemory: Long = model.memory.map { it.size }.sum() + private val listeners = mutableListOf<HostListener>() /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( - scope.coroutineContext, clock, - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - - _batch.put(_cpuWork, requestedWork.toDouble()) - _batch.put(_cpuWorkGranted, grantedWork.toDouble()) - _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble()) - _batch.put(_cpuWorkInterference, interferedWork.toDouble()) - _batch.put(_cpuUsage, cpuUsage) - _batch.put(_cpuDemand, cpuDemand) - _batch.put(_cpuPower, machine.powerDraw) - _batch.record() - } - } - ) + private val hypervisor: SimHypervisor = hypervisorProvider + .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) /** * The virtual machines running on the hypervisor. */ private val guests = HashMap<Server, Guest>() + private val _guests = mutableListOf<Guest>() override val state: HostState get() = _state @@ -144,123 +121,99 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) - - /** - * The number of guests on the host. - */ - private val _guests = meter.longUpDownCounterBuilder("guests.total") - .setDescription("Number of guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The CPU usage on the host. - */ - private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - - /** - * The CPU demand on the host. - */ - private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .build() - - /** - * The work actually performed by the CPU. - */ - private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .build() + override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The work that could not be performed by the CPU due to overcommitting resource. + * The [GuestListener] that listens for guest events. */ - private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .build() + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The work that could not be performed by the CPU due to interference. - */ - private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .build() + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } /** - * The batch recorder used to record multiple metrics atomically. + * The [Job] that represents the machine running the hypervisor. */ - private val _batch = meter.newBatchRecorder("host", uid.toString()) + private var _job: Job? = null init { - // Launch hypervisor onto machine - scope.launch { - try { - _state = HostState.UP - machine.run(this@SimHost.hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN - } - } + launch() + + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerUsage) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback { result -> result.observe(machine.energyUsage) } + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectUptime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { - val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount + val sufficientMemory = model.memorySize >= server.flavor.memorySize + val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit } override suspend fun spawn(server: Server, start: Boolean) { - // Return if the server already exists on this host - if (server in this) { - return + val guest = guests.computeIfAbsent(server) { key -> + require(canFit(key)) { "Server does not fit" } + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val newGuest = Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) + + _guests.add(newGuest) + newGuest } - require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) - guests[server] = guest - _guests.add(1) - if (start) { guest.start() } @@ -281,9 +234,8 @@ public class SimHost( } override suspend fun delete(server: Server) { - val guest = guests.remove(server) ?: return - guest.terminate() - _guests.add(-1) + val guest = guests[server] ?: return + guest.delete() } override fun addListener(listener: HostListener) { @@ -295,130 +247,233 @@ public class SimHost( } override fun close() { + reset() scope.cancel() machine.close() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + public suspend fun fail() { + reset() + + for (guest in _guests) { + guest.fail() + } + } + + public suspend fun recover() { + updateUptime() + + launch() + + // Wait for the hypervisor to launch before recovering the guests + yield() + + for (guest in _guests) { + guest.recover() + } + } + + /** + * Launch the hypervisor. + */ + private fun launch() { + check(_job == null) { "Concurrent hypervisor running" } + + // Launch hypervisor onto machine + _job = scope.launch { + try { + _bootTime = clock.millis() + _state = HostState.UP + machine.run(hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + + /** + * Reset the machine. + */ + private fun reset() { + updateUptime() + + // Stop the hypervisor + val job = _job + if (job != null) { + job.cancel() + _job = null + } + + _state = HostState.DOWN + } + /** * Convert flavor to machine model. */ - private fun Flavor.toMachineModel(): SimMachineModel { + private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - return SimMachineModel(processingUnits, memoryUnits) + return MachineModel(processingUnits, memoryUnits).optimize() } - private fun onGuestStart(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStart(vm.server.image.name) - } + /** + * Optimize the [MachineModel] for simulation. + */ + private fun MachineModel.optimize(): MachineModel { + if (!optimize) { + return this } - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStop(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStop(vm.server.image.name) + private val STATE_KEY = AttributeKey.stringKey("state") + + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") + + /** + * Helper function to collect the guest counts on this host. + */ + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ } } - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) } - override suspend fun fail() { - _state = HostState.DOWN - } + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) - override suspend fun recover() { - _state = HostState.UP + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuLimit(result) + } } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + /** - * A virtual machine instance that the driver manages. + * Helper function to track the CPU time of a machine. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + private fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = hypervisor.counters - var state: ServerState = ServerState.TERMINATED + result.observe(counters.cpuActiveTime / 1000L, _activeState) + result.observe(counters.cpuIdleTime / 1000L, _idleState) + result.observe(counters.cpuStealTime / 1000L, _stealState) + result.observe(counters.cpuLostTime / 1000L, _lostState) - suspend fun start() { - when (state) { - ServerState.TERMINATED -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } - } + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuTime(result) } + } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + private var _lastReport = clock.millis() - suspend fun terminate() { - stop() - state = ServerState.DELETED + /** + * Helper function to track the uptime of a machine. + */ + private fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + + if (_state == HostState.UP) { + _uptime += duration + } else if (_state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - job = null - } - } + val guests = _guests + for (i in guests.indices) { + guests[i].updateUptime(duration) } + } + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + + val guests = _guests + for (i in guests.indices) { + guests[i].collectUptime(result) } + } + + private var _bootTime = Long.MIN_VALUE - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } - availableMemory += server.flavor.memorySize - onGuestStop(this) + val guests = _guests + for (i in guests.indices) { + guests[i].collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt new file mode 100644 index 00000000..258ccc89 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost +import java.time.Clock + +/** + * Interface responsible for applying the fault to a host. + */ +public interface HostFault { + /** + * Apply the fault to the specified [victims]. + */ + public suspend fun apply(clock: Clock, victims: List<SimHost>) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt new file mode 100644 index 00000000..5eff439f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.internal.HostFaultInjectorImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * An interface for stochastically injecting faults into a set of hosts. + */ +public interface HostFaultInjector : AutoCloseable { + /** + * Start fault injection. + */ + public fun start() + + /** + * Stop fault injection into the system. + */ + public override fun close() + + public companion object { + /** + * Construct a new [HostFaultInjector]. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The hosts to inject the faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + iat: RealDistribution, + selector: VictimSelector, + fault: HostFault + ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt new file mode 100644 index 00000000..fc7cebfc --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.time.Clock +import kotlin.math.roundToLong + +/** + * A type of [HostFault] where the hosts are stopped and recover after some random amount of time. + */ +public class StartStopHostFault(private val duration: RealDistribution) : HostFault { + override suspend fun apply(clock: Clock, victims: List<SimHost>) { + for (host in victims) { + host.fail() + } + + val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds + + // Handle long overflow + if (clock.millis() + df <= 0) { + return + } + + delay(df) + + for (host in victims) { + host.recover() + } + } + + override fun toString(): String = "StartStopHostFault[$duration]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt new file mode 100644 index 00000000..fcd9dd7e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.util.* +import kotlin.math.roundToInt + +/** + * A [VictimSelector] that stochastically selects a set of hosts to be failed. + */ +public class StochasticVictimSelector( + private val size: RealDistribution, + private val random: Random = Random(0) +) : VictimSelector { + + override fun select(hosts: Set<SimHost>): List<SimHost> { + val n = size.sample().roundToInt() + return hosts.shuffled(random).take(n) + } + + override fun toString(): String = "StochasticVictimSelector[$size]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt new file mode 100644 index 00000000..b5610284 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost + +/** + * Interface responsible for selecting the victim(s) for fault injection. + */ +public interface VictimSelector { + /** + * Select the hosts from [hosts] where a fault will be injected. + */ + public fun select(hosts: Set<SimHost>): List<SimHost> +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..5ea1860d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,350 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.common.AttributesBuilder +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimVirtualMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = GuestAttributes(this) + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start deleted server" } + throw IllegalArgumentException("Server is deleted") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Delete the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun delete() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * Recover the guest if it is in an error state. + */ + suspend fun recover() { + if (state != ServerState.ERROR) { + return + } + + doStart() + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Converge the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = attributes.toBuilder() + .put(STATE_KEY, "up") + .build() + private val _downState = attributes.toBuilder() + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime and downtime of the guest. + */ + fun updateUptime(duration: Long) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + } + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(result: ObservableLongMeasurement) { + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } + } + + private val _activeState = attributes.toBuilder() + .put(STATE_KEY, "active") + .build() + private val _stealState = attributes.toBuilder() + .put(STATE_KEY, "steal") + .build() + private val _lostState = attributes.toBuilder() + .put(STATE_KEY, "lost") + .build() + private val _idleState = attributes.toBuilder() + .put(STATE_KEY, "idle") + .build() + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = machine.counters + + result.observe(counters.cpuActiveTime / 1000, _activeState) + result.observe(counters.cpuIdleTime / 1000, _idleState) + result.observe(counters.cpuStealTime / 1000, _stealState) + result.observe(counters.cpuLostTime / 1000, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } + + /** + * An optimized [Attributes] implementation. + */ + private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { + /** + * Construct a [GuestAttributes] instance from a [Guest]. + */ + constructor(guest: Guest) : this( + guest.server.uid.toString(), + Attributes.builder() + .put(ResourceAttributes.HOST_NAME, guest.server.name) + .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) + .build() + ) + + override fun <T : Any?> get(key: AttributeKey<T>): T? { + // Optimize access to the HOST_ID key which is accessed quite often + if (key == ResourceAttributes.HOST_ID) { + @Suppress("UNCHECKED_CAST") + return uid as T? + } + return attributes.get(key) + } + + override fun toBuilder(): AttributesBuilder { + val delegate = attributes.toBuilder() + return object : AttributesBuilder { + + override fun putAll(attributes: Attributes): AttributesBuilder { + delegate.putAll(attributes) + return this + } + + override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder { + delegate.put<T>(key, value) + return this + } + + override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder { + delegate.put(key, value) + return this + } + + override fun build(): Attributes = GuestAttributes(uid, delegate.build()) + } + } + + override fun equals(other: Any?): Boolean = attributes == other + + // Cache hash code + private val _hash = attributes.hashCode() + + override fun hashCode(): Int = _hash + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt new file mode 100644 index 00000000..7d46e626 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import kotlinx.coroutines.* +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFault +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.VictimSelector +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * Internal implementation of the [HostFaultInjector] interface. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The set of hosts to inject faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ +internal class HostFaultInjectorImpl( + private val context: CoroutineContext, + private val clock: Clock, + private val hosts: Set<SimHost>, + private val iat: RealDistribution, + private val selector: VictimSelector, + private val fault: HostFault +) : HostFaultInjector { + /** + * The scope in which the injector runs. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * Start the fault injection into the system. + */ + override fun start() { + if (job != null) { + return + } + + job = scope.launch { + runInjector() + job = null + } + } + + /** + * Converge the injection process. + */ + private suspend fun runInjector() { + while (true) { + // Make sure to convert delay from hours to milliseconds + val d = (iat.sample() * 3.6e6).roundToLong() + + // Handle long overflow + if (clock.millis() + d <= 0) { + return + } + + delay(d) + + val victims = selector.select(hosts) + fault.apply(clock, victims) + } + } + + /** + * Stop the fault injector. + */ + public override fun close() { + scope.cancel() + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5594fd59..a0ff9228 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,47 +23,48 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.util.UUID +import java.time.Duration +import java.util.* import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var machineModel: SimMachineModel + private lateinit var machineModel: MachineModel @BeforeEach fun setUp() { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - machineModel = SimMachineModel( + machineModel = MachineModel( cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) @@ -74,16 +75,31 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val engine = FlowEngine(coroutineContext, clock) + val virtDriver = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), @@ -91,12 +107,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), + offset = 1 ) ) ) @@ -106,12 +123,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2) - ) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) + ), + offset = 1 ) ) ) @@ -121,20 +139,14 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() - grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() - overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() - return CompletableResultCode.ofSuccess() + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000 + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -155,18 +167,124 @@ internal class SimHostTest { } // Ensure last cycle is collected - delay(1000 * duration) + delay(1000L * duration) virtDriver.close() reader.close() assertAll( - { assertEquals(4197600, requestedWork, "Requested work does not match") }, - { assertEquals(2157600, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(658, activeTime, "Active time does not match") }, + { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } + /** + * Test failure of the host. + */ + @Test + fun testFailure() = runBlockingSimulation { + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setResource(hostResource) + .setClock(clock.toOtelClock()) + .build() + + val engine = FlowEngine(coroutineContext, clock) + val host = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) + val duration = 5 * 60L + val image = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) + ), + offset = 1 + ) + ) + ) + val flavor = MockFlavor(2, 0) + val server = MockServer(UUID.randomUUID(), "a", flavor, image) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } + + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime + } + }, + exportInterval = Duration.ofSeconds(duration) + ) + + coroutineScope { + host.spawn(server) + delay(5000L) + host.fail() + delay(duration * 1000) + host.recover() + + suspendCancellableCoroutine<Unit> { cont -> + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED) { + cont.resume(Unit) + } + } + }) + } + } + + host.close() + // Ensure last cycle is collected + delay(1000L * duration) + + reader.close() + + assertAll( + { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(624, activeTime, "Active time does not match") }, + { assertEquals(900001, uptime, "Uptime does not match") }, + { assertEquals(300000, downtime, "Downtime does not match") }, + { assertEquals(900000, guestUptime, "Guest uptime does not match") }, + { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + ) + } + private class MockFlavor( override val cpuCount: Int, override val memorySize: Long diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt new file mode 100644 index 00000000..f240a25f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.SimHost +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Test suite for [HostFaultInjector] class. + */ +internal class HostFaultInjectorTest { + /** + * Simple test case to test that nothing happens when the injector is not started. + */ + @Test + fun testInjectorNotStarted() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + coVerify(exactly = 0) { host.fail() } + coVerify(exactly = 0) { host.recover() } + + injector.close() + } + + /** + * Simple test case to test a start stop fault where the machine is stopped and started after some time. + */ + @Test + fun testInjectorStopsMachine() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { host.fail() } + coVerify(exactly = 1) { host.recover() } + } + + /** + * Simple test case to test a start stop fault where multiple machines are stopped. + */ + @Test + fun testInjectorStopsMultipleMachines() = runBlockingSimulation { + val hosts = listOf<SimHost>( + mockk(relaxUnitFun = true), + mockk(relaxUnitFun = true) + ) + + val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet()) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { hosts[0].fail() } + coVerify(exactly = 1) { hosts[1].fail() } + coVerify(exactly = 1) { hosts[0].recover() } + coVerify(exactly = 1) { hosts[1].recover() } + } + + /** + * Create a simple start stop fault injector. + */ + private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector { + val rng = Well19937c(0) + val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03) + val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25)) + val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + + return HostFaultInjector(context, clock, hosts, iat, selector, fault) + } +} diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts new file mode 100644 index 00000000..28a5e1da --- /dev/null +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating VM-based workloads with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcCompute.opendcComputeSimulator) + + implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcSimulator.opendcSimulatorCore) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(libs.opentelemetry.semconv) + + implementation(libs.kotlin.logging) + implementation(libs.jackson.databind) + implementation(libs.jackson.module.kotlin) + implementation(kotlin("reflect")) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt new file mode 100644 index 00000000..c94f30e4 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.compute.workload + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 1615df3a..78002c2f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -20,17 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.service.scheduler.weights +package org.opendc.compute.workload -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView import java.util.* /** - * A [HostWeigher] that assigns random weights to each host every selection. + * An interface that describes how a workload is resolved. */ -public class RandomWeigher(private val random: Random) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double = random.nextDouble() - - override fun toString(): String = "RandomWeigher" +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt new file mode 100644 index 00000000..1a6624f7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import mu.KotlinLogging +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.trace.* +import java.io.File +import java.time.Duration +import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A helper class for loading compute workload traces into memory. + * + * @param baseDir The directory containing the traces. + */ +public class ComputeWorkloadLoader(private val baseDir: File) { + /** + * The logger for this instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() + + /** + * Read the fragments into memory. + */ + private fun parseFragments(trace: Trace): Map<String, Builder> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + val fragments = mutableMapOf<String, Builder>() + + return try { + while (reader.nextRow()) { + val id = reader.get(idCol) as String + val time = reader.get(timestampCol) as Instant + val duration = reader.get(durationCol) as Duration + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) + + val timeMs = time.toEpochMilli() + val deadlineMs = timeMs + duration.toMillis() + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(timeMs, deadlineMs, cpuUsage, cores) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + var counter = 0 + val entries = mutableListOf<VirtualMachine>() + + return try { + while (reader.nextRow()) { + + val id = reader.get(idCol) as String + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = reader.get(startTimeCol) as Instant + val endTime = reader.get(stopTimeCol) as Instant + val maxCores = reader.getInt(coresCol) + val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad + + entries.add( + VirtualMachine( + uid, + id, + maxCores, + requiredMemory.roundToLong(), + totalLoad, + submissionTime, + endTime, + builder.build() + ) + ) + } + + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * Load the trace with the specified [name] and [format]. + */ + public fun get(name: String, format: String): List<VirtualMachine> { + return cache.computeIfAbsent(name) { + val path = baseDir.resolve(it) + + logger.info { "Loading trace $it at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + parseMeta(trace, fragments) + } + } + + /** + * Clear the workload cache. + */ + public fun reset() { + cache.clear() + } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val duration = max(0, deadline - timestamp) + totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + builder.add(timestamp, deadline, usage, cores) + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt new file mode 100644 index 00000000..283f82fe --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.flow.FlowEngine +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to simulated VM-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param scheduler [ComputeScheduler] implementation to use for the service. + * @param failureModel A failure model to use for injecting failures. + * @param interferenceModel The model to use for performance interference. + */ +public class ComputeWorkloadRunner( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + private val failureModel: FailureModel? = null, + private val interferenceModel: VmInterferenceModel? = null, +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + public val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + public val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [FlowEngine] to simulate the hosts. + */ + private val engine = FlowEngine(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + } + + /** + * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + */ + public suspend fun run(trace: List<VirtualMachine>, seed: Long) { + val random = Random(seed) + val injector = failureModel?.createInjector(context, clock, service, random) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + if (offset < 0) { + offset = start - now + } + + // Make sure the trace entries are ordered by submission time + assert(start - offset >= 0) { "Invalid trace order" } + delay(max(0, (start - offset) - now)) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity + ), + meta = mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + client.close() + } + } + + /** + * Register a host for this simulation. + * + * @param spec The definition of the host. + * @param optimize Merge the CPU resources of the host into a single CPU resource. + * @return The [SimHost] that has been constructed by the runner. + */ + public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { + val resource = Resource.builder() + .put(HOST_ID, spec.uid.toString()) + .put(HOST_NAME, spec.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, spec.model.cpus.size) + .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + _metricProducers.add(meterProvider) + + val host = SimHost( + spec.uid, + spec.name, + spec.model, + spec.meta, + context, + engine, + meterProvider, + spec.hypervisor, + powerDriver = spec.powerDriver, + interferenceDomain = interferenceModel?.newDomain(), + optimize = optimize + ) + + hosts.add(host) + service.addHost(host) + + return host + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt new file mode 100644 index 00000000..2f4935ca --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") +package org.opendc.compute.workload + +import org.opendc.compute.workload.internal.CompositeComputeWorkload +import org.opendc.compute.workload.internal.HpcSampledComputeWorkload +import org.opendc.compute.workload.internal.LoadSampledComputeWorkload +import org.opendc.compute.workload.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt new file mode 100644 index 00000000..4d9ef15d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +public interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt new file mode 100644 index 00000000..be7120b9 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.compute.workload + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import java.time.Clock +import java.time.Duration +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +public fun grid5000(failureInterval: Duration): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService, + random: Random + ): HostFaultInjector { + val rng = Well19937c(random.nextLong()) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt new file mode 100644 index 00000000..5dd239f6 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.simulator.compute.workload.SimTrace +import java.time.Instant +import java.util.* + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace that belong to this VM. + */ +public data class VirtualMachine( + val uid: UUID, + val name: String, + val cpuCount: Int, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: SimTrace, +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt new file mode 100644 index 00000000..ad182d67 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import io.opentelemetry.sdk.common.CompletableResultCode +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: ServerData) { + serverWriter.write(data) + } + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun shutdown(): CompletableResultCode { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + + return CompletableResultCode.ofSuccess() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..4172d729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalOutputFile +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + */ +public abstract class ParquetDataWriter<in T>( + path: File, + private val schema: Schema, + bufferSize: Int = 4096 +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Process the specified [data] to be written to the Parquet file. + */ + private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..98a0739e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.HostData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["host_id"] = data.host.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + builder["boot_time"] = bootTime?.toEpochMilli() + + builder["cpu_count"] = data.host.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.host.memCapacity + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid + } + + override fun toString(): String = "host-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..0d11ec23 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + builder["boot_time"] = bootTime?.toEpochMilli() + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_count"] = data.server.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity + } + + override fun toString(): String = "server-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredLong("scheduling_latency") + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..47824b29 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecordBuilder +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.io.File + +/** + * A Parquet event writer for [ServiceData]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { + + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError + } + + override fun toString(): String = "service-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("service") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..9b2bec55 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf<VirtualMachine>() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..52f4c672 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf<VirtualMachine>() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..ef6de729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + val res = mutableListOf<VirtualMachine>() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt new file mode 100644 index 00000000..c20cb8f3 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] from a trace. + */ +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + return loader.get(name, format) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt new file mode 100644 index 00000000..f3dc1e9e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.topology + +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerDriver +import java.util.* + +/** + * Description of a physical host that will be simulated by OpenDC and host the virtual machines. + * + * @param uid Unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param model The physical model of the machine. + * @param powerDriver The [PowerDriver] to model the power consumption of the machine. + * @param hypervisor The hypervisor implementation to use. + */ +public data class HostSpec( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerDriver: PowerDriver, + val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt new file mode 100644 index 00000000..3b8dc918 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.topology + +/** + * Representation of the environment of the compute service, describing the physical details of every host. + */ +public interface Topology { + /** + * Resolve the [Topology] into a list of [HostSpec]s. + */ + public fun resolve(): List<HostSpec> +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt new file mode 100644 index 00000000..74f9a1f8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TopologyHelpers") +package org.opendc.compute.workload.topology + +import org.opendc.compute.workload.ComputeWorkloadRunner + +/** + * Apply the specified [topology] to the given [ComputeWorkloadRunner]. + */ +public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) { + val hosts = topology.resolve() + for (spec in hosts) { + registerHost(spec, optimize) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt new file mode 100644 index 00000000..67f9626c --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class PerformanceInterferenceReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + init { + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): List<VmInterferenceGroup> { + return mapper.readValue(file) + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): List<VmInterferenceGroup> { + return mapper.readValue(input) + } + + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..c79f0584 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val result = PerformanceInterferenceReader().read(input) + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] |
