diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
5 files changed, 637 insertions, 328 deletions
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index cad051e6..aaf69f78 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -40,5 +40,6 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index be6ef11e..793db907 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -26,13 +26,16 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimHypervisorProvider @@ -44,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.resources.SimResourceDistributorMaxMin import org.opendc.simulator.resources.SimResourceInterpreter import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlin.math.roundToLong /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -100,32 +103,29 @@ public class SimHost( /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( + private val hypervisor: SimHypervisor = hypervisor.create( interpreter, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - _totalWork.add(requestedWork) - _grantedWork.add(grantedWork) - _overcommittedWork.add(overcommittedWork) - _interferedWork.add(interferedWork) - _cpuDemand.record(cpuDemand) - _cpuUsage.record(cpuUsage) - _powerUsage.record(machine.powerDraw) - - reportTime() + _cpuDemand = cpuDemand + _cpuUsage = cpuUsage + + collectTime() } } ) + private var _cpuUsage = 0.0 + private var _cpuDemand = 0.0 /** * The virtual machines running on the hypervisor. @@ -145,109 +145,23 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The total number of guests. - */ - private val _guests = meter.upDownCounterBuilder("guests.total") - .setDescription("Total number of guests") - .setUnit("1") - .build() - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.upDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - - /** - * The CPU demand of the host. - */ - private val _cpuDemand = meter.histogramBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - - /** - * The CPU usage of the host. - */ - private val _cpuUsage = meter.histogramBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - - /** - * The power usage of the host. - */ - private val _powerUsage = meter.histogramBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - - /** - * The total amount of work supplied to the CPU. - */ - private val _totalWork = meter.counterBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The work performed by the CPU. - */ - private val _grantedWork = meter.counterBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount not performed by the CPU due to overcommitment. - */ - private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount of work not performed by the CPU due to interference. - */ - private val _interferedWork = meter.counterBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount of time in the system. + * The [GuestListener] that listens for guest events. */ - private val _totalTime = meter.counterBuilder("host.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - - /** - * The uptime of the host. - */ - private val _upTime = meter.counterBuilder("host.time.up") - .setDescription("The uptime of the host") - .setUnit("ms") - .build() + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The downtime of the host. - */ - private val _downTime = meter.counterBuilder("host.time.down") - .setDescription("The downtime of the host") - .setUnit("ms") - .build() + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } init { // Launch hypervisor onto machine scope.launch { try { + _bootTime = clock.millis() _state = HostState.UP machine.run(this@SimHost.hypervisor, emptyMap()) } catch (_: CancellationException) { @@ -259,29 +173,48 @@ public class SimHost( _state = HostState.DOWN } } - } - - private var _lastReport = clock.millis() - - private fun reportTime() { - if (!scope.isActive) - return - - val now = clock.millis() - val duration = now - _lastReport - - _totalTime.add(duration) - when (_state) { - HostState.UP -> _upTime.add(duration) - HostState.DOWN -> _downTime.add(duration) - } - // Track time of guests - for (guest in guests.values) { - guest.reportTime() - } - - _lastReport = now + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerDraw) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback(::collectPowerTotal) + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectTime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -295,8 +228,17 @@ public class SimHost( override suspend fun spawn(server: Server, start: Boolean) { val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - _guests.add(1) - Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name)) + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) } if (start) { @@ -320,7 +262,6 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return - _guests.add(-1) guest.terminate() } @@ -333,7 +274,7 @@ public class SimHost( } override fun close() { - reportTime() + reset() scope.cancel() machine.close() } @@ -341,22 +282,35 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { - reportTime() - _state = HostState.DOWN + reset() + for (guest in guests.values) { guest.fail() } } public suspend fun recover() { - reportTime() + collectTime() _state = HostState.UP + _bootTime = clock.millis() + for (guest in guests.values) { guest.start() } } /** + * Reset the machine. + */ + private fun reset() { + collectTime() + + _state = HostState.DOWN + _cpuUsage = 0.0 + _cpuDemand = 0.0 + } + + /** * Convert flavor to machine model. */ private fun Flavor.toMachineModel(): MachineModel { @@ -368,162 +322,168 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStart(vm: Guest) { - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val STATE_KEY = AttributeKey.stringKey("state") - private fun onGuestStop(vm: Guest) { - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") /** - * A virtual machine instance that the driver manages. + * Helper function to collect the guest counts on this host. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - var state: ServerState = ServerState.TERMINATED - - /** - * The attributes of the guest. - */ - val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.HOST_NAME, server.name) - .put(ResourceAttributes.HOST_ID, server.uid.toString()) - .put(ResourceAttributes.HOST_TYPE, server.flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) - .build() - - /** - * The amount of time in the system. - */ - private val _totalTime = meter.counterBuilder("guest.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - .bind(attributes) - - /** - * The uptime of the guest. - */ - private val _runningTime = meter.counterBuilder("guest.time.running") - .setDescription("The uptime of the guest") - .setUnit("ms") - .build() - .bind(attributes) - - /** - * The time the guest is in an error state. - */ - private val _errorTime = meter.counterBuilder("guest.time.error") - .setDescription("The time the guest is in an error state") - .setUnit("ms") - .build() - .bind(attributes) - - suspend fun start() { - when (state) { - ServerState.TERMINATED, ServerState.ERROR -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + for ((_, guest) in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + else -> invalid++ } } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) + } - suspend fun terminate() { - stop() - machine.close() - state = ServerState.DELETED + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) + + for (guest in guests.values) { + guest.collectCpuLimit(result) } + } - suspend fun fail() { - if (state != ServerState.RUNNING) { - return - } - stop() - state = ServerState.ERROR + private var _lastCpuTimeCallback = clock.millis() + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(result: ObservableLongMeasurement) { + val now = clock.millis() + val duration = now - _lastCpuTimeCallback + + try { + collectCpuTime(duration, result) + } finally { + _lastCpuTimeCallback = now } + } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - try { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - } catch (e: Throwable) { - cont.resumeWithException(e) - } - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - job = null - } - } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = this.model.cpuCount + val d = coreCount / _cpuLimit + + val counters = hypervisor.counters + val grantedWork = counters.actual + val overcommittedWork = counters.overcommit + val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0 + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + val lostTime = (interferedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(lostTime, _lostState) + + for (guest in guests.values) { + guest.collectCpuTime(duration, result) } + } + + private var _lastPowerCallback = clock.millis() + private var _totalPower = 0.0 - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + /** + * Helper function to collect the total power usage of the machine. + */ + private fun collectPowerTotal(result: ObservableDoubleMeasurement) { + val now = clock.millis() + val duration = now - _lastPowerCallback + + _totalPower += duration / 1000.0 * machine.powerDraw + result.observe(_totalPower) + + _lastPowerCallback = now + } + + private var _lastReport = clock.millis() + + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(result: ObservableLongMeasurement? = null) { + val now = clock.millis() + val duration = now - _lastReport + + try { + collectTime(duration, result) + } finally { + _lastReport = now } + } - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - onGuestStop(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == HostState.UP) { + _uptime += duration + } else if (state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var _lastReport = clock.millis() + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) - fun reportTime() { - if (state == ServerState.DELETED) - return + for (guest in guests.values) { + guest.collectUptime(duration, result) + } + } - val now = clock.millis() - val duration = now - _lastReport + private var _bootTime = Long.MIN_VALUE - _totalTime.add(duration) - when (state) { - ServerState.RUNNING -> _runningTime.add(duration) - ServerState.ERROR -> _errorTime.add(duration) - else -> {} - } + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } - _lastReport = now + for (guest in guests.values) { + guest.collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..90562e2f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.SimAbstractMachine +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, server.name) + .put(ResourceAttributes.HOST_ID, server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) + .build() + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Terminate the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun terminate() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Run the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "up") + .build() + private val _downState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } + } + + private val _activeState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "active") + .build() + private val _stealState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "steal") + .build() + private val _lostState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "lost") + .build() + private val _idleState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "idle") + .build() + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = server.flavor.cpuCount + val d = coreCount / _cpuLimit + + var grantedWork = 0.0 + var overcommittedWork = 0.0 + + for (cpu in (machine as SimAbstractMachine).cpus) { + val counters = cpu.counters + grantedWork += counters.actual + overcommittedWork += counters.overcommit + } + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(0, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 318b5a5d..9c879e5e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,11 +23,9 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -44,17 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration import java.util.* import kotlin.coroutines.resume -import kotlin.math.roundToLong /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { private lateinit var machineModel: MachineModel @@ -73,18 +74,23 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var totalWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val virtDriver = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), @@ -132,20 +138,16 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() - grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() - overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong() - return CompletableResultCode.ofSuccess() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime + } } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, + ), exportInterval = Duration.ofSeconds(duration) ) @@ -172,9 +174,9 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4147200, totalWork, "Requested work does not match") }, - { assertEquals(2107200, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(659, activeTime, "Active time does not match") }, + { assertEquals(2342, idleTime, "Idle time does not match") }, + { assertEquals(638, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -184,21 +186,26 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var totalWork = 0L - var grantedWork = 0L - var totalTime = 0L - var downTime = 0L - var guestTotalTime = 0L - var guestDownTime = 0L - + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val host = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), @@ -230,24 +237,22 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() - grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() - totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value - downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value - guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value - guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value - - return CompletableResultCode.ofSuccess() - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime + } + } + ), exportInterval = Duration.ofSeconds(duration) ) @@ -276,12 +281,12 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2226040, totalWork, "Total time does not match") }, - { assertEquals(1086040, grantedWork, "Down time does not match") }, - { assertEquals(1200001, totalTime, "Total time does not match") }, - { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, - { assertEquals(5000, downTime, "Down time does not match") }, - { assertEquals(5000, guestDownTime, "Guest down time does not match") }, + { assertEquals(2661, idleTime, "Idle time does not match") }, + { assertEquals(339, activeTime, "Active time does not match") }, + { assertEquals(1195001, uptime, "Uptime does not match") }, + { assertEquals(5000, downtime, "Downtime does not match") }, + { assertEquals(1195000, guestUptime, "Guest uptime does not match") }, + { assertEquals(5000, guestDowntime, "Guest downtime does not match") }, ) } |
