diff options
Diffstat (limited to 'opendc-compute')
53 files changed, 3887 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index baa1ba2f..577fbc73 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -45,7 +45,7 @@ public interface ComputeClient : AutoCloseable { * * @param name The name of the flavor. * @param cpuCount The amount of CPU cores for this flavor. - * @param memorySize The size of the memory. + * @param memorySize The size of the memory in MB. * @param labels The identifying labels of the image. * @param meta The non-identifying meta-data of the image. */ diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index e0e48b0f..33cafc45 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index 5632a55e..fc092a3f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,7 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * - * @property vcpuCount The number of logical processing cores available for this host. + * @property cpuCount The number of logical processing cores available for this host. * @property memorySize The amount of memory available for this host in MB. */ public data class HostModel(public val cpuCount: Int, public val memorySize: Long) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 8af5f86e..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,7 +22,10 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -33,6 +36,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -40,15 +44,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -61,6 +68,11 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + + /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -104,60 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. - */ - private val _submittedServers = meter.longCounterBuilder("servers.submitted") - .setDescription("Number of start requests") - .setUnit("1") - .build() - - /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. + * The number of scheduling attempts. */ - private val _runningServers = meter.longUpDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of servers that have finished running. + * The response time of the service. */ - private val _finishedServers = meter.longCounterBuilder("servers.finished") - .setDescription("Number of servers that finished running") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of hosts registered at the compute service. + * The number of servers that are pending. */ - private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") - .build() - - /** - * The number of available hosts registered at the compute service. - */ - private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -170,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -297,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -325,10 +325,9 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - val request = SchedulingRequest(server) + val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -354,10 +353,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -368,12 +369,13 @@ internal class ComputeServiceImpl( * Run a single scheduling iteration. */ private fun doSchedule() { + val now = clock.millis() while (queue.isNotEmpty()) { val request = queue.peek() if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -385,12 +387,12 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } - server.state = ServerState.ERROR + server.state = ServerState.TERMINATED continue } else { break @@ -401,7 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -416,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -430,7 +438,7 @@ internal class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - internal data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) { /** * A flag to indicate that the request is cancelled. */ @@ -440,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -475,14 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -50,6 +53,21 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + + /** * The [Host] that has been assigned to host the server. */ internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt index 0fd5b2a4..8c2d4715 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/FilterScheduler.kt @@ -26,6 +26,8 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView import org.opendc.compute.service.scheduler.filters.HostFilter import org.opendc.compute.service.scheduler.weights.HostWeigher +import java.util.* +import kotlin.math.min /** * A [ComputeScheduler] implementation that uses filtering and weighing passes to select @@ -33,13 +35,27 @@ import org.opendc.compute.service.scheduler.weights.HostWeigher * * This implementation is based on the filter scheduler from OpenStack Nova. * See: https://docs.openstack.org/nova/latest/user/filter-scheduler.html + * + * @param filters The list of filters to apply when searching for an appropriate host. + * @param weighers The list of weighers to apply when searching for an appropriate host. + * @param subsetSize The size of the subset of best hosts from which a target is randomly chosen. + * @param random A [Random] instance for selecting */ -public class FilterScheduler(private val filters: List<HostFilter>, private val weighers: List<Pair<HostWeigher, Double>>) : ComputeScheduler { +public class FilterScheduler( + private val filters: List<HostFilter>, + private val weighers: List<HostWeigher>, + private val subsetSize: Int = 1, + private val random: Random = Random(0) +) : ComputeScheduler { /** * The pool of hosts available to the scheduler. */ private val hosts = mutableListOf<HostView>() + init { + require(subsetSize >= 1) { "Subset size must be one or greater" } + } + override fun addHost(host: HostView) { hosts.add(host) } @@ -49,18 +65,44 @@ public class FilterScheduler(private val filters: List<HostFilter>, private val } override fun select(server: Server): HostView? { - return hosts.asSequence() - .filter { host -> - for (filter in filters) { - if (!filter.test(host, server)) - return@filter false + val hosts = hosts + val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, server) } } + + val subset = if (weighers.isNotEmpty()) { + val results = weighers.map { it.getWeights(filteredHosts, server) } + val weights = DoubleArray(filteredHosts.size) + + for (result in results) { + val min = result.min + val range = (result.max - min) + + // Skip result if all weights are the same + if (range == 0.0) { + continue } - true - } - .sortedByDescending { host -> - weighers.sumByDouble { (weigher, factor) -> weigher.getWeight(host, server) * factor } + val multiplier = result.multiplier + val factor = multiplier / range + + for ((i, weight) in result.weights.withIndex()) { + weights[i] += factor * (weight - min) + } } - .firstOrNull() + + weights.indices + .asSequence() + .sortedByDescending { weights[it] } + .map { filteredHosts[it] } + .take(subsetSize) + .toList() + } else { + filteredHosts + } + + return when (val maxSize = min(subsetSize, subset.size)) { + 0 -> null + 1 -> subset[0] + else -> subset[random.nextInt(maxSize)] + } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt new file mode 100644 index 00000000..a470a453 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the memory requirements of a [Server] and the RAM available on the host. + * + * @param allocationRatio Virtual RAM to physical RAM allocation ratio. + */ +public class RamFilter(private val allocationRatio: Double) : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requested = server.flavor.memorySize + val available = host.availableMemory + val total = host.host.model.memorySize + + // Do not allow an instance to overcommit against itself, only against + // other instances. + if (requested > total) { + return false + } + + val limit = total * allocationRatio + val used = total - available + val usable = limit - used + return usable >= requested + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt index 072440c5..abdd79f1 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/ComputeCapabilitiesFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuFilter.kt @@ -26,15 +26,22 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostFilter] that checks whether the capabilities provided by the host satisfies the requirements of the server - * flavor. + * A [HostFilter] that filters hosts based on the vCPU requirements of a [Server] and the available vCPUs on the host. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ -public class ComputeCapabilitiesFilter : HostFilter { +public class VCpuFilter(private val allocationRatio: Double) : HostFilter { override fun test(host: HostView, server: Server): Boolean { - val fitsMemory = host.availableMemory >= server.flavor.memorySize - val fitsCpu = host.host.model.cpuCount >= server.flavor.cpuCount - return fitsMemory && fitsCpu - } + val requested = server.flavor.cpuCount + val total = host.host.model.cpuCount + val limit = total * allocationRatio - override fun toString(): String = "ComputeCapabilitiesFilter" + // Do not allow an instance to overcommit against itself, only against other instances + if (requested > total) { + return false + } + + val free = limit - host.provisionedCores + return free >= requested + } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt index 12e6510e..d668fdaf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreMemoryWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/CoreRamWeigher.kt @@ -27,11 +27,15 @@ import org.opendc.compute.service.internal.HostView /** * A [HostWeigher] that weighs the hosts based on the available memory per core on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available core memory, and a negative number will result in the scheduler preferring hosts with less available core + * memory. */ -public class CoreMemoryWeigher : HostWeigher { +public class CoreRamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.availableMemory.toDouble() / host.host.model.cpuCount } - override fun toString(): String = "CoreMemoryWeigher" + override fun toString(): String = "CoreRamWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt index d48ee9e0..aca8c4e6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/HostWeigher.kt @@ -29,9 +29,47 @@ import org.opendc.compute.service.scheduler.FilterScheduler /** * An interface used by the [FilterScheduler] to weigh the pool of host for a scheduling request. */ -public fun interface HostWeigher { +public interface HostWeigher { + /** + * The multiplier for the weigher. + */ + public val multiplier: Double + /** * Obtain the weight of the specified [host] when scheduling the specified [server]. */ public fun getWeight(host: HostView, server: Server): Double + + /** + * Obtain the weights for [hosts] when scheduling the specified [server]. + */ + public fun getWeights(hosts: List<HostView>, server: Server): Result { + val weights = DoubleArray(hosts.size) + var min = Double.MAX_VALUE + var max = Double.MIN_VALUE + + for ((i, host) in hosts.withIndex()) { + val weight = getWeight(host, server) + weights[i] = weight + min = kotlin.math.min(min, weight) + max = kotlin.math.max(max, weight) + } + + return Result(weights, min, max, multiplier) + } + + /** + * A result returned by the weigher. + * + * @param weights The weights returned by the weigher. + * @param min The minimum weight returned. + * @param max The maximum weight returned. + * @param multiplier The weight multiplier to use. + */ + public class Result( + public val weights: DoubleArray, + public val min: Double, + public val max: Double, + public val multiplier: Double, + ) } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt index 2ef733e5..732cbe03 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/InstanceCountWeigher.kt @@ -28,7 +28,7 @@ import org.opendc.compute.service.internal.HostView /** * A [HostWeigher] that weighs the hosts based on the number of instances on the host. */ -public class InstanceCountWeigher : HostWeigher { +public class InstanceCountWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.instanceCount.toDouble() } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt index 115d8e4d..d18d31f4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/MemoryWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RamWeigher.kt @@ -26,12 +26,15 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostWeigher] that weighs the hosts based on the available memory on the host. + * A [HostWeigher] that weighs the hosts based on the available RAM (memory) on the host. + * + * @param multiplier Weight multiplier ratio. A positive value will result in the scheduler preferring hosts with more + * available memory, and a negative number will result in the scheduler preferring hosts with less memory. */ -public class MemoryWeigher : HostWeigher { +public class RamWeigher(override val multiplier: Double = 1.0) : HostWeigher { override fun getWeight(host: HostView, server: Server): Double { return host.availableMemory.toDouble() } - override fun toString(): String = "MemoryWeigher" + override fun toString(): String = "RamWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt index df5bcd6e..4a22269b 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/ProvisionedCoresWeigher.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuWeigher.kt @@ -26,12 +26,19 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView /** - * A [HostWeigher] that weighs the hosts based on the number of provisioned cores on the host. + * A [HostWeigher] that weighs the hosts based on the remaining number of vCPUs available. + * + * @param allocationRatio Virtual CPU to physical CPU allocation ratio. */ -public class ProvisionedCoresWeigher : HostWeigher { +public class VCpuWeigher(private val allocationRatio: Double, override val multiplier: Double = 1.0) : HostWeigher { + + init { + require(allocationRatio > 0.0) { "Allocation ratio must be greater than zero" } + } + override fun getWeight(host: HostView, server: Server): Double { - return host.provisionedCores.toDouble() + return host.host.model.cpuCount * allocationRatio - host.provisionedCores } - override fun toString(): String = "ProvisionedCoresWeigher" + override fun toString(): String = "VCpuWeigher" } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index a6258845..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -37,9 +37,10 @@ import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.MemoryWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import java.util.* @@ -57,11 +58,10 @@ internal class ComputeServiceTest { scope = SimulationCoroutineScope() val clock = scope.clock val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), + weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test @@ -167,9 +167,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -180,9 +180,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -193,9 +193,9 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -207,7 +207,7 @@ internal class ComputeServiceTest { server.start() server.stop() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.TERMINATED, server.state) } @@ -228,7 +228,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(10 * 60 * 1000) + delay(10L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -254,12 +254,12 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.UP listeners.forEach { it.onStateChanged(host, HostState.UP) } - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -284,13 +284,13 @@ internal class ComputeServiceTest { val image = client.newImage("test") val server = client.newServer("test", image, flavor, start = false) - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) every { host.state } returns HostState.DOWN listeners.forEach { it.onStateChanged(host, HostState.DOWN) } server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) @@ -344,7 +344,7 @@ internal class ComputeServiceTest { // Start server server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) coVerify { host.spawn(capture(slot), true) } listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } @@ -383,7 +383,7 @@ internal class ComputeServiceTest { val server = client.newServer("test", image, flavor, start = false) server.start() - delay(5 * 60 * 1000) + delay(5L * 60 * 1000) server.refresh() assertEquals(ServerState.PROVISIONING, server.state) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 20ea8d20..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,11 +99,11 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } server.start() @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,10 +158,10 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -220,10 +221,10 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk<InternalFlavor>() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk<InternalImage>() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt new file mode 100644 index 00000000..cafd4498 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.HostModel +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.internal.HostView +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.InstanceCountFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Test suite for the [FilterScheduler]. + */ +internal class FilterSchedulerTest { + @Test + fun testInvalidSubsetSize() { + assertThrows<IllegalArgumentException> { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = 0 + ) + } + + assertThrows<IllegalArgumentException> { + FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = -2 + ) + } + } + + @Test + fun testNoHosts() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testNoFiltersAndSchedulers() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostA, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testNoFiltersAndSchedulersRandom() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(1) + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.DOWN + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + // Make sure we get the first host both times + assertAll( + { assertEquals(hostB, scheduler.select(server)) }, + { assertEquals(hostA, scheduler.select(server)) } + ) + } + + @Test + fun testHostIsDown() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.DOWN + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testHostIsUp() { + val scheduler = FilterScheduler( + filters = listOf(ComputeFilter()), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(host, scheduler.select(server)) + } + + @Test + fun testRamFilter() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 512 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 2048 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(RamFilter(1.5)), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.availableMemory } returns 2048 + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 2300 + + assertNull(scheduler.select(server)) + } + + @Test + fun testVCpuFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(1.0)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 3 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuFilterOvercommit() { + val scheduler = FilterScheduler( + filters = listOf(VCpuFilter(16.0)), + weighers = emptyList(), + ) + + val host = mockk<HostView>() + every { host.host.state } returns HostState.UP + every { host.host.model } returns HostModel(4, 2048) + every { host.provisionedCores } returns 0 + + scheduler.addHost(host) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 8 + every { server.flavor.memorySize } returns 1024 + + assertNull(scheduler.select(server)) + } + + @Test + fun testInstanceCountFilter() { + val scheduler = FilterScheduler( + filters = listOf(InstanceCountFilter(limit = 2)), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(RamWeigher(1.5)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostA, scheduler.select(server)) + } + + @Test + fun testCoreRamWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(CoreRamWeigher(1.5)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.availableMemory } returns 1024 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testVCpuWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(VCpuWeigher(16.0)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.provisionedCores } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.provisionedCores } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } + + @Test + fun testInstanceCountWeigher() { + val scheduler = FilterScheduler( + filters = emptyList(), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.instanceCount } returns 2 + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.instanceCount } returns 0 + + scheduler.addHost(hostA) + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + + assertEquals(hostB, scheduler.select(server)) + } +} diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index b31a2114..aaf69f78 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -33,11 +33,13 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcCompute.opendcComputeService) api(projects.opendcSimulator.opendcSimulatorCompute) - api(projects.opendcSimulator.opendcSimulatorFailures) + api(libs.commons.math3) implementation(projects.opendcUtils) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 68667a8c..b9d02185 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,29 +22,34 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.ScalingDriver -import org.opendc.simulator.compute.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.kernel.SimHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.PowerModel -import org.opendc.simulator.failures.FailureDomain -import java.time.Clock +import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -52,34 +57,27 @@ import kotlin.coroutines.resume public class SimHost( override val uid: UUID, override val name: String, - model: SimMachineModel, + model: MachineModel, override val meta: Map<String, Any>, context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver, + engine: FlowEngine, + meterProvider: MeterProvider, + hypervisorProvider: SimHypervisorProvider, + scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), + powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), -) : Host, FailureDomain, AutoCloseable { - - public constructor( - uid: UUID, - name: String, - model: SimMachineModel, - meta: Map<String, Any>, - context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - powerModel: PowerModel = ConstantPowerModel(0.0), - mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper) - + interferenceDomain: VmInterferenceDomain? = null, + private val optimize: Boolean = false +) : Host, AutoCloseable { /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context + Job()) + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The clock instance used by the host. + */ + private val clock = engine.clock /** * The logger instance of this server. @@ -87,52 +85,31 @@ public class SimHost( private val logger = KotlinLogging.logger {} /** - * The event listeners registered with this host. + * The [Meter] to track metrics of the simulated host. */ - private val listeners = mutableListOf<HostListener>() + private val meter = meterProvider.get("org.opendc.compute.simulator") /** - * Current total memory use of the images on this hypervisor. + * The event listeners registered with this host. */ - private var availableMemory: Long = model.memory.map { it.size }.sum() + private val listeners = mutableListOf<HostListener>() /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( - scope.coroutineContext, clock, - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - - _batch.put(_cpuWork, requestedWork.toDouble()) - _batch.put(_cpuWorkGranted, grantedWork.toDouble()) - _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble()) - _batch.put(_cpuWorkInterference, interferedWork.toDouble()) - _batch.put(_cpuUsage, cpuUsage) - _batch.put(_cpuDemand, cpuDemand) - _batch.put(_cpuPower, machine.powerDraw) - _batch.record() - } - } - ) + private val hypervisor: SimHypervisor = hypervisorProvider + .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) /** * The virtual machines running on the hypervisor. */ private val guests = HashMap<Server, Guest>() + private val _guests = mutableListOf<Guest>() override val state: HostState get() = _state @@ -144,123 +121,99 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) - - /** - * The number of guests on the host. - */ - private val _guests = meter.longUpDownCounterBuilder("guests.total") - .setDescription("Number of guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The CPU usage on the host. - */ - private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - - /** - * The CPU demand on the host. - */ - private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .build() - - /** - * The work actually performed by the CPU. - */ - private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .build() + override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The work that could not be performed by the CPU due to overcommitting resource. + * The [GuestListener] that listens for guest events. */ - private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .build() + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The work that could not be performed by the CPU due to interference. - */ - private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .build() + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } /** - * The batch recorder used to record multiple metrics atomically. + * The [Job] that represents the machine running the hypervisor. */ - private val _batch = meter.newBatchRecorder("host", uid.toString()) + private var _job: Job? = null init { - // Launch hypervisor onto machine - scope.launch { - try { - _state = HostState.UP - machine.run(this@SimHost.hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN - } - } + launch() + + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerUsage) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback { result -> result.observe(machine.energyUsage) } + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectUptime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { - val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount + val sufficientMemory = model.memorySize >= server.flavor.memorySize + val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit } override suspend fun spawn(server: Server, start: Boolean) { - // Return if the server already exists on this host - if (server in this) { - return + val guest = guests.computeIfAbsent(server) { key -> + require(canFit(key)) { "Server does not fit" } + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val newGuest = Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) + + _guests.add(newGuest) + newGuest } - require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) - guests[server] = guest - _guests.add(1) - if (start) { guest.start() } @@ -281,9 +234,8 @@ public class SimHost( } override suspend fun delete(server: Server) { - val guest = guests.remove(server) ?: return - guest.terminate() - _guests.add(-1) + val guest = guests[server] ?: return + guest.delete() } override fun addListener(listener: HostListener) { @@ -295,130 +247,233 @@ public class SimHost( } override fun close() { + reset() scope.cancel() machine.close() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + public suspend fun fail() { + reset() + + for (guest in _guests) { + guest.fail() + } + } + + public suspend fun recover() { + updateUptime() + + launch() + + // Wait for the hypervisor to launch before recovering the guests + yield() + + for (guest in _guests) { + guest.recover() + } + } + + /** + * Launch the hypervisor. + */ + private fun launch() { + check(_job == null) { "Concurrent hypervisor running" } + + // Launch hypervisor onto machine + _job = scope.launch { + try { + _bootTime = clock.millis() + _state = HostState.UP + machine.run(hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + + /** + * Reset the machine. + */ + private fun reset() { + updateUptime() + + // Stop the hypervisor + val job = _job + if (job != null) { + job.cancel() + _job = null + } + + _state = HostState.DOWN + } + /** * Convert flavor to machine model. */ - private fun Flavor.toMachineModel(): SimMachineModel { + private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - return SimMachineModel(processingUnits, memoryUnits) + return MachineModel(processingUnits, memoryUnits).optimize() } - private fun onGuestStart(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStart(vm.server.image.name) - } + /** + * Optimize the [MachineModel] for simulation. + */ + private fun MachineModel.optimize(): MachineModel { + if (!optimize) { + return this } - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStop(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStop(vm.server.image.name) + private val STATE_KEY = AttributeKey.stringKey("state") + + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") + + /** + * Helper function to collect the guest counts on this host. + */ + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ } } - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) } - override suspend fun fail() { - _state = HostState.DOWN - } + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) - override suspend fun recover() { - _state = HostState.UP + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuLimit(result) + } } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + /** - * A virtual machine instance that the driver manages. + * Helper function to track the CPU time of a machine. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + private fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = hypervisor.counters - var state: ServerState = ServerState.TERMINATED + result.observe(counters.cpuActiveTime / 1000L, _activeState) + result.observe(counters.cpuIdleTime / 1000L, _idleState) + result.observe(counters.cpuStealTime / 1000L, _stealState) + result.observe(counters.cpuLostTime / 1000L, _lostState) - suspend fun start() { - when (state) { - ServerState.TERMINATED -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } - } + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuTime(result) } + } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + private var _lastReport = clock.millis() - suspend fun terminate() { - stop() - state = ServerState.DELETED + /** + * Helper function to track the uptime of a machine. + */ + private fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + + if (_state == HostState.UP) { + _uptime += duration + } else if (_state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - job = null - } - } + val guests = _guests + for (i in guests.indices) { + guests[i].updateUptime(duration) } + } + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + + val guests = _guests + for (i in guests.indices) { + guests[i].collectUptime(result) } + } + + private var _bootTime = Long.MIN_VALUE - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } - availableMemory += server.flavor.memorySize - onGuestStop(this) + val guests = _guests + for (i in guests.indices) { + guests[i].collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt new file mode 100644 index 00000000..258ccc89 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost +import java.time.Clock + +/** + * Interface responsible for applying the fault to a host. + */ +public interface HostFault { + /** + * Apply the fault to the specified [victims]. + */ + public suspend fun apply(clock: Clock, victims: List<SimHost>) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt new file mode 100644 index 00000000..5eff439f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.internal.HostFaultInjectorImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * An interface for stochastically injecting faults into a set of hosts. + */ +public interface HostFaultInjector : AutoCloseable { + /** + * Start fault injection. + */ + public fun start() + + /** + * Stop fault injection into the system. + */ + public override fun close() + + public companion object { + /** + * Construct a new [HostFaultInjector]. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The hosts to inject the faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + iat: RealDistribution, + selector: VictimSelector, + fault: HostFault + ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt new file mode 100644 index 00000000..fc7cebfc --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.time.Clock +import kotlin.math.roundToLong + +/** + * A type of [HostFault] where the hosts are stopped and recover after some random amount of time. + */ +public class StartStopHostFault(private val duration: RealDistribution) : HostFault { + override suspend fun apply(clock: Clock, victims: List<SimHost>) { + for (host in victims) { + host.fail() + } + + val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds + + // Handle long overflow + if (clock.millis() + df <= 0) { + return + } + + delay(df) + + for (host in victims) { + host.recover() + } + } + + override fun toString(): String = "StartStopHostFault[$duration]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt new file mode 100644 index 00000000..fcd9dd7e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.util.* +import kotlin.math.roundToInt + +/** + * A [VictimSelector] that stochastically selects a set of hosts to be failed. + */ +public class StochasticVictimSelector( + private val size: RealDistribution, + private val random: Random = Random(0) +) : VictimSelector { + + override fun select(hosts: Set<SimHost>): List<SimHost> { + val n = size.sample().roundToInt() + return hosts.shuffled(random).take(n) + } + + override fun toString(): String = "StochasticVictimSelector[$size]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt new file mode 100644 index 00000000..b5610284 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost + +/** + * Interface responsible for selecting the victim(s) for fault injection. + */ +public interface VictimSelector { + /** + * Select the hosts from [hosts] where a fault will be injected. + */ + public fun select(hosts: Set<SimHost>): List<SimHost> +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..5ea1860d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,350 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.common.AttributesBuilder +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimVirtualMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = GuestAttributes(this) + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start deleted server" } + throw IllegalArgumentException("Server is deleted") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Delete the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun delete() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * Recover the guest if it is in an error state. + */ + suspend fun recover() { + if (state != ServerState.ERROR) { + return + } + + doStart() + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Converge the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = attributes.toBuilder() + .put(STATE_KEY, "up") + .build() + private val _downState = attributes.toBuilder() + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime and downtime of the guest. + */ + fun updateUptime(duration: Long) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + } + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(result: ObservableLongMeasurement) { + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } + } + + private val _activeState = attributes.toBuilder() + .put(STATE_KEY, "active") + .build() + private val _stealState = attributes.toBuilder() + .put(STATE_KEY, "steal") + .build() + private val _lostState = attributes.toBuilder() + .put(STATE_KEY, "lost") + .build() + private val _idleState = attributes.toBuilder() + .put(STATE_KEY, "idle") + .build() + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = machine.counters + + result.observe(counters.cpuActiveTime / 1000, _activeState) + result.observe(counters.cpuIdleTime / 1000, _idleState) + result.observe(counters.cpuStealTime / 1000, _stealState) + result.observe(counters.cpuLostTime / 1000, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } + + /** + * An optimized [Attributes] implementation. + */ + private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { + /** + * Construct a [GuestAttributes] instance from a [Guest]. + */ + constructor(guest: Guest) : this( + guest.server.uid.toString(), + Attributes.builder() + .put(ResourceAttributes.HOST_NAME, guest.server.name) + .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) + .build() + ) + + override fun <T : Any?> get(key: AttributeKey<T>): T? { + // Optimize access to the HOST_ID key which is accessed quite often + if (key == ResourceAttributes.HOST_ID) { + @Suppress("UNCHECKED_CAST") + return uid as T? + } + return attributes.get(key) + } + + override fun toBuilder(): AttributesBuilder { + val delegate = attributes.toBuilder() + return object : AttributesBuilder { + + override fun putAll(attributes: Attributes): AttributesBuilder { + delegate.putAll(attributes) + return this + } + + override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder { + delegate.put<T>(key, value) + return this + } + + override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder { + delegate.put(key, value) + return this + } + + override fun build(): Attributes = GuestAttributes(uid, delegate.build()) + } + } + + override fun equals(other: Any?): Boolean = attributes == other + + // Cache hash code + private val _hash = attributes.hashCode() + + override fun hashCode(): Int = _hash + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt new file mode 100644 index 00000000..7d46e626 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import kotlinx.coroutines.* +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFault +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.VictimSelector +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * Internal implementation of the [HostFaultInjector] interface. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The set of hosts to inject faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ +internal class HostFaultInjectorImpl( + private val context: CoroutineContext, + private val clock: Clock, + private val hosts: Set<SimHost>, + private val iat: RealDistribution, + private val selector: VictimSelector, + private val fault: HostFault +) : HostFaultInjector { + /** + * The scope in which the injector runs. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * Start the fault injection into the system. + */ + override fun start() { + if (job != null) { + return + } + + job = scope.launch { + runInjector() + job = null + } + } + + /** + * Converge the injection process. + */ + private suspend fun runInjector() { + while (true) { + // Make sure to convert delay from hours to milliseconds + val d = (iat.sample() * 3.6e6).roundToLong() + + // Handle long overflow + if (clock.millis() + d <= 0) { + return + } + + delay(d) + + val victims = selector.select(hosts) + fault.apply(clock, victims) + } + } + + /** + * Stop the fault injector. + */ + public override fun close() { + scope.cancel() + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5594fd59..a0ff9228 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,47 +23,48 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.util.UUID +import java.time.Duration +import java.util.* import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var machineModel: SimMachineModel + private lateinit var machineModel: MachineModel @BeforeEach fun setUp() { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - machineModel = SimMachineModel( + machineModel = MachineModel( cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) @@ -74,16 +75,31 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val engine = FlowEngine(coroutineContext, clock) + val virtDriver = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), @@ -91,12 +107,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), + offset = 1 ) ) ) @@ -106,12 +123,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2) - ) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) + ), + offset = 1 ) ) ) @@ -121,20 +139,14 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() - grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() - overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() - return CompletableResultCode.ofSuccess() + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000 + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -155,18 +167,124 @@ internal class SimHostTest { } // Ensure last cycle is collected - delay(1000 * duration) + delay(1000L * duration) virtDriver.close() reader.close() assertAll( - { assertEquals(4197600, requestedWork, "Requested work does not match") }, - { assertEquals(2157600, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(658, activeTime, "Active time does not match") }, + { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } + /** + * Test failure of the host. + */ + @Test + fun testFailure() = runBlockingSimulation { + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setResource(hostResource) + .setClock(clock.toOtelClock()) + .build() + + val engine = FlowEngine(coroutineContext, clock) + val host = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) + val duration = 5 * 60L + val image = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) + ), + offset = 1 + ) + ) + ) + val flavor = MockFlavor(2, 0) + val server = MockServer(UUID.randomUUID(), "a", flavor, image) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } + + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime + } + }, + exportInterval = Duration.ofSeconds(duration) + ) + + coroutineScope { + host.spawn(server) + delay(5000L) + host.fail() + delay(duration * 1000) + host.recover() + + suspendCancellableCoroutine<Unit> { cont -> + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED) { + cont.resume(Unit) + } + } + }) + } + } + + host.close() + // Ensure last cycle is collected + delay(1000L * duration) + + reader.close() + + assertAll( + { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(624, activeTime, "Active time does not match") }, + { assertEquals(900001, uptime, "Uptime does not match") }, + { assertEquals(300000, downtime, "Downtime does not match") }, + { assertEquals(900000, guestUptime, "Guest uptime does not match") }, + { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + ) + } + private class MockFlavor( override val cpuCount: Int, override val memorySize: Long diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt new file mode 100644 index 00000000..f240a25f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.SimHost +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Test suite for [HostFaultInjector] class. + */ +internal class HostFaultInjectorTest { + /** + * Simple test case to test that nothing happens when the injector is not started. + */ + @Test + fun testInjectorNotStarted() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + coVerify(exactly = 0) { host.fail() } + coVerify(exactly = 0) { host.recover() } + + injector.close() + } + + /** + * Simple test case to test a start stop fault where the machine is stopped and started after some time. + */ + @Test + fun testInjectorStopsMachine() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { host.fail() } + coVerify(exactly = 1) { host.recover() } + } + + /** + * Simple test case to test a start stop fault where multiple machines are stopped. + */ + @Test + fun testInjectorStopsMultipleMachines() = runBlockingSimulation { + val hosts = listOf<SimHost>( + mockk(relaxUnitFun = true), + mockk(relaxUnitFun = true) + ) + + val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet()) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { hosts[0].fail() } + coVerify(exactly = 1) { hosts[1].fail() } + coVerify(exactly = 1) { hosts[0].recover() } + coVerify(exactly = 1) { hosts[1].recover() } + } + + /** + * Create a simple start stop fault injector. + */ + private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector { + val rng = Well19937c(0) + val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03) + val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25)) + val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + + return HostFaultInjector(context, clock, hosts, iat, selector, fault) + } +} diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts new file mode 100644 index 00000000..28a5e1da --- /dev/null +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating VM-based workloads with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcCompute.opendcComputeSimulator) + + implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcSimulator.opendcSimulatorCore) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) + implementation(libs.opentelemetry.semconv) + + implementation(libs.kotlin.logging) + implementation(libs.jackson.databind) + implementation(libs.jackson.module.kotlin) + implementation(kotlin("reflect")) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt new file mode 100644 index 00000000..c94f30e4 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.compute.workload + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (name) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(placements) + else -> throw IllegalArgumentException("Unknown policy $name") + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index 1615df3a..78002c2f 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/RandomWeigher.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -20,17 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.service.scheduler.weights +package org.opendc.compute.workload -import org.opendc.compute.api.Server -import org.opendc.compute.service.internal.HostView import java.util.* /** - * A [HostWeigher] that assigns random weights to each host every selection. + * An interface that describes how a workload is resolved. */ -public class RandomWeigher(private val random: Random) : HostWeigher { - override fun getWeight(host: HostView, server: Server): Double = random.nextDouble() - - override fun toString(): String = "RandomWeigher" +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt new file mode 100644 index 00000000..1a6624f7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import mu.KotlinLogging +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.trace.* +import java.io.File +import java.time.Duration +import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A helper class for loading compute workload traces into memory. + * + * @param baseDir The directory containing the traces. + */ +public class ComputeWorkloadLoader(private val baseDir: File) { + /** + * The logger for this instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() + + /** + * Read the fragments into memory. + */ + private fun parseFragments(trace: Trace): Map<String, Builder> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val durationCol = reader.resolve(RESOURCE_STATE_DURATION) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + val fragments = mutableMapOf<String, Builder>() + + return try { + while (reader.nextRow()) { + val id = reader.get(idCol) as String + val time = reader.get(timestampCol) as Instant + val duration = reader.get(durationCol) as Duration + val cores = reader.getInt(coresCol) + val cpuUsage = reader.getDouble(usageCol) + + val timeMs = time.toEpochMilli() + val deadlineMs = timeMs + duration.toMillis() + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(timeMs, deadlineMs, cpuUsage, cores) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + var counter = 0 + val entries = mutableListOf<VirtualMachine>() + + return try { + while (reader.nextRow()) { + + val id = reader.get(idCol) as String + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = reader.get(startTimeCol) as Instant + val endTime = reader.get(stopTimeCol) as Instant + val maxCores = reader.getInt(coresCol) + val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad + + entries.add( + VirtualMachine( + uid, + id, + maxCores, + requiredMemory.roundToLong(), + totalLoad, + submissionTime, + endTime, + builder.build() + ) + ) + } + + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + reader.close() + } + } + + /** + * Load the trace with the specified [name] and [format]. + */ + public fun get(name: String, format: String): List<VirtualMachine> { + return cache.computeIfAbsent(name) { + val path = baseDir.resolve(it) + + logger.info { "Loading trace $it at $path" } + + val trace = Trace.open(path, format) + val fragments = parseFragments(trace) + parseMeta(trace, fragments) + } + } + + /** + * Clear the workload cache. + */ + public fun reset() { + cache.clear() + } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val duration = max(0, deadline - timestamp) + totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + builder.add(timestamp, deadline, usage, cores) + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt new file mode 100644 index 00000000..283f82fe --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.flow.FlowEngine +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to simulated VM-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param scheduler [ComputeScheduler] implementation to use for the service. + * @param failureModel A failure model to use for injecting failures. + * @param interferenceModel The model to use for performance interference. + */ +public class ComputeWorkloadRunner( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + private val failureModel: FailureModel? = null, + private val interferenceModel: VmInterferenceModel? = null, +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + public val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + public val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [FlowEngine] to simulate the hosts. + */ + private val engine = FlowEngine(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + } + + /** + * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + */ + public suspend fun run(trace: List<VirtualMachine>, seed: Long) { + val random = Random(seed) + val injector = failureModel?.createInjector(context, clock, service, random) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + if (offset < 0) { + offset = start - now + } + + // Make sure the trace entries are ordered by submission time + assert(start - offset >= 0) { "Invalid trace order" } + delay(max(0, (start - offset) - now)) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity + ), + meta = mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + client.close() + } + } + + /** + * Register a host for this simulation. + * + * @param spec The definition of the host. + * @param optimize Merge the CPU resources of the host into a single CPU resource. + * @return The [SimHost] that has been constructed by the runner. + */ + public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { + val resource = Resource.builder() + .put(HOST_ID, spec.uid.toString()) + .put(HOST_NAME, spec.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, spec.model.cpus.size) + .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + _metricProducers.add(meterProvider) + + val host = SimHost( + spec.uid, + spec.name, + spec.model, + spec.meta, + context, + engine, + meterProvider, + spec.hypervisor, + powerDriver = spec.powerDriver, + interferenceDomain = interferenceModel?.newDomain(), + optimize = optimize + ) + + hosts.add(host) + service.addHost(host) + + return host + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt new file mode 100644 index 00000000..2f4935ca --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") +package org.opendc.compute.workload + +import org.opendc.compute.workload.internal.CompositeComputeWorkload +import org.opendc.compute.workload.internal.HpcSampledComputeWorkload +import org.opendc.compute.workload.internal.LoadSampledComputeWorkload +import org.opendc.compute.workload.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt new file mode 100644 index 00000000..4d9ef15d --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +public interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt new file mode 100644 index 00000000..be7120b9 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.compute.workload + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import java.time.Clock +import java.time.Duration +import java.util.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +public fun grid5000(failureInterval: Duration): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService, + random: Random + ): HostFaultInjector { + val rng = Well19937c(random.nextLong()) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt new file mode 100644 index 00000000..5dd239f6 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.simulator.compute.workload.SimTrace +import java.time.Instant +import java.util.* + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace that belong to this VM. + */ +public data class VirtualMachine( + val uid: UUID, + val name: String, + val cpuCount: Int, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: SimTrace, +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt new file mode 100644 index 00000000..ad182d67 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import io.opentelemetry.sdk.common.CompletableResultCode +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: ServerData) { + serverWriter.write(data) + } + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun shutdown(): CompletableResultCode { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + + return CompletableResultCode.ofSuccess() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..4172d729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalOutputFile +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + */ +public abstract class ParquetDataWriter<in T>( + path: File, + private val schema: Schema, + bufferSize: Int = 4096 +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Process the specified [data] to be written to the Parquet file. + */ + private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..98a0739e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.HostData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["host_id"] = data.host.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + builder["boot_time"] = bootTime?.toEpochMilli() + + builder["cpu_count"] = data.host.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.host.memCapacity + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid + } + + override fun toString(): String = "host-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..0d11ec23 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.UUID_SCHEMA +import org.opendc.trace.util.parquet.optional +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + builder["boot_time"] = bootTime?.toEpochMilli() + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_count"] = data.server.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity + } + + override fun toString(): String = "server-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredLong("scheduling_latency") + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..47824b29 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecordBuilder +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.io.File + +/** + * A Parquet event writer for [ServiceData]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { + + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError + } + + override fun toString(): String = "service-writer" + + private companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("service") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") + .endRecord() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..9b2bec55 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf<VirtualMachine>() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..52f4c672 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf<VirtualMachine>() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..ef6de729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + val res = mutableListOf<VirtualMachine>() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt new file mode 100644 index 00000000..c20cb8f3 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] from a trace. + */ +internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + return loader.get(name, format) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt new file mode 100644 index 00000000..f3dc1e9e --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.topology + +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerDriver +import java.util.* + +/** + * Description of a physical host that will be simulated by OpenDC and host the virtual machines. + * + * @param uid Unique identifier of the host. + * @param name The name of the host. + * @param meta The metadata of the host. + * @param model The physical model of the machine. + * @param powerDriver The [PowerDriver] to model the power consumption of the machine. + * @param hypervisor The hypervisor implementation to use. + */ +public data class HostSpec( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerDriver: PowerDriver, + val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt new file mode 100644 index 00000000..3b8dc918 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.topology + +/** + * Representation of the environment of the compute service, describing the physical details of every host. + */ +public interface Topology { + /** + * Resolve the [Topology] into a list of [HostSpec]s. + */ + public fun resolve(): List<HostSpec> +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt new file mode 100644 index 00000000..74f9a1f8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TopologyHelpers") +package org.opendc.compute.workload.topology + +import org.opendc.compute.workload.ComputeWorkloadRunner + +/** + * Apply the specified [topology] to the given [ComputeWorkloadRunner]. + */ +public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) { + val hosts = topology.resolve() + for (spec in hosts) { + registerHost(spec, optimize) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt new file mode 100644 index 00000000..67f9626c --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class PerformanceInterferenceReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + init { + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): List<VmInterferenceGroup> { + return mapper.readValue(file) + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): List<VmInterferenceGroup> { + return mapper.readValue(input) + } + + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..c79f0584 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val result = PerformanceInterferenceReader().read(input) + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] |
