diff options
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src')
11 files changed, 1299 insertions, 289 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 68667a8c..b9d02185 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,29 +22,34 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.ScalingDriver -import org.opendc.simulator.compute.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.kernel.SimHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.PowerModel -import org.opendc.simulator.failures.FailureDomain -import java.time.Clock +import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -52,34 +57,27 @@ import kotlin.coroutines.resume public class SimHost( override val uid: UUID, override val name: String, - model: SimMachineModel, + model: MachineModel, override val meta: Map<String, Any>, context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver, + engine: FlowEngine, + meterProvider: MeterProvider, + hypervisorProvider: SimHypervisorProvider, + scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), + powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), -) : Host, FailureDomain, AutoCloseable { - - public constructor( - uid: UUID, - name: String, - model: SimMachineModel, - meta: Map<String, Any>, - context: CoroutineContext, - clock: Clock, - meter: Meter, - hypervisor: SimHypervisorProvider, - powerModel: PowerModel = ConstantPowerModel(0.0), - mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper) - + interferenceDomain: VmInterferenceDomain? = null, + private val optimize: Boolean = false +) : Host, AutoCloseable { /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context + Job()) + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The clock instance used by the host. + */ + private val clock = engine.clock /** * The logger instance of this server. @@ -87,52 +85,31 @@ public class SimHost( private val logger = KotlinLogging.logger {} /** - * The event listeners registered with this host. + * The [Meter] to track metrics of the simulated host. */ - private val listeners = mutableListOf<HostListener>() + private val meter = meterProvider.get("org.opendc.compute.simulator") /** - * Current total memory use of the images on this hypervisor. + * The event listeners registered with this host. */ - private var availableMemory: Long = model.memory.map { it.size }.sum() + private val listeners = mutableListOf<HostListener>() /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( - scope.coroutineContext, clock, - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - - _batch.put(_cpuWork, requestedWork.toDouble()) - _batch.put(_cpuWorkGranted, grantedWork.toDouble()) - _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble()) - _batch.put(_cpuWorkInterference, interferedWork.toDouble()) - _batch.put(_cpuUsage, cpuUsage) - _batch.put(_cpuDemand, cpuDemand) - _batch.put(_cpuPower, machine.powerDraw) - _batch.record() - } - } - ) + private val hypervisor: SimHypervisor = hypervisorProvider + .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) /** * The virtual machines running on the hypervisor. */ private val guests = HashMap<Server, Guest>() + private val _guests = mutableListOf<Guest>() override val state: HostState get() = _state @@ -144,123 +121,99 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) - - /** - * The number of guests on the host. - */ - private val _guests = meter.longUpDownCounterBuilder("guests.total") - .setDescription("Number of guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - .bind(Labels.of("host", uid.toString())) - - /** - * The CPU usage on the host. - */ - private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - - /** - * The CPU demand on the host. - */ - private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - - /** - * The requested work for the CPU. - */ - private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .build() - - /** - * The work actually performed by the CPU. - */ - private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .build() + override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The work that could not be performed by the CPU due to overcommitting resource. + * The [GuestListener] that listens for guest events. */ - private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .build() + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The work that could not be performed by the CPU due to interference. - */ - private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .build() + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } /** - * The batch recorder used to record multiple metrics atomically. + * The [Job] that represents the machine running the hypervisor. */ - private val _batch = meter.newBatchRecorder("host", uid.toString()) + private var _job: Job? = null init { - // Launch hypervisor onto machine - scope.launch { - try { - _state = HostState.UP - machine.run(this@SimHost.hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN - } - } + launch() + + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerUsage) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback { result -> result.observe(machine.energyUsage) } + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectUptime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { - val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount + val sufficientMemory = model.memorySize >= server.flavor.memorySize + val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit } override suspend fun spawn(server: Server, start: Boolean) { - // Return if the server already exists on this host - if (server in this) { - return + val guest = guests.computeIfAbsent(server) { key -> + require(canFit(key)) { "Server does not fit" } + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val newGuest = Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) + + _guests.add(newGuest) + newGuest } - require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) - guests[server] = guest - _guests.add(1) - if (start) { guest.start() } @@ -281,9 +234,8 @@ public class SimHost( } override suspend fun delete(server: Server) { - val guest = guests.remove(server) ?: return - guest.terminate() - _guests.add(-1) + val guest = guests[server] ?: return + guest.delete() } override fun addListener(listener: HostListener) { @@ -295,130 +247,233 @@ public class SimHost( } override fun close() { + reset() scope.cancel() machine.close() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + public suspend fun fail() { + reset() + + for (guest in _guests) { + guest.fail() + } + } + + public suspend fun recover() { + updateUptime() + + launch() + + // Wait for the hypervisor to launch before recovering the guests + yield() + + for (guest in _guests) { + guest.recover() + } + } + + /** + * Launch the hypervisor. + */ + private fun launch() { + check(_job == null) { "Concurrent hypervisor running" } + + // Launch hypervisor onto machine + _job = scope.launch { + try { + _bootTime = clock.millis() + _state = HostState.UP + machine.run(hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + + /** + * Reset the machine. + */ + private fun reset() { + updateUptime() + + // Stop the hypervisor + val job = _job + if (job != null) { + job.cancel() + _job = null + } + + _state = HostState.DOWN + } + /** * Convert flavor to machine model. */ - private fun Flavor.toMachineModel(): SimMachineModel { + private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - return SimMachineModel(processingUnits, memoryUnits) + return MachineModel(processingUnits, memoryUnits).optimize() } - private fun onGuestStart(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStart(vm.server.image.name) - } + /** + * Optimize the [MachineModel] for simulation. + */ + private fun MachineModel.optimize(): MachineModel { + if (!optimize) { + return this } - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStop(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStop(vm.server.image.name) + private val STATE_KEY = AttributeKey.stringKey("state") + + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") + + /** + * Helper function to collect the guest counts on this host. + */ + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ } } - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) } - override suspend fun fail() { - _state = HostState.DOWN - } + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) - override suspend fun recover() { - _state = HostState.UP + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuLimit(result) + } } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + /** - * A virtual machine instance that the driver manages. + * Helper function to track the CPU time of a machine. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + private fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = hypervisor.counters - var state: ServerState = ServerState.TERMINATED + result.observe(counters.cpuActiveTime / 1000L, _activeState) + result.observe(counters.cpuIdleTime / 1000L, _idleState) + result.observe(counters.cpuStealTime / 1000L, _stealState) + result.observe(counters.cpuLostTime / 1000L, _lostState) - suspend fun start() { - when (state) { - ServerState.TERMINATED -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } - } + val guests = _guests + for (i in guests.indices) { + guests[i].collectCpuTime(result) } + } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + private var _lastReport = clock.millis() - suspend fun terminate() { - stop() - state = ServerState.DELETED + /** + * Helper function to track the uptime of a machine. + */ + private fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + + if (_state == HostState.UP) { + _uptime += duration + } else if (_state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - job = null - } - } + val guests = _guests + for (i in guests.indices) { + guests[i].updateUptime(duration) } + } + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + + val guests = _guests + for (i in guests.indices) { + guests[i].collectUptime(result) } + } + + private var _bootTime = Long.MIN_VALUE - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } - availableMemory += server.flavor.memorySize - onGuestStop(this) + val guests = _guests + for (i in guests.indices) { + guests[i].collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt new file mode 100644 index 00000000..258ccc89 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost +import java.time.Clock + +/** + * Interface responsible for applying the fault to a host. + */ +public interface HostFault { + /** + * Apply the fault to the specified [victims]. + */ + public suspend fun apply(clock: Clock, victims: List<SimHost>) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt new file mode 100644 index 00000000..5eff439f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.internal.HostFaultInjectorImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * An interface for stochastically injecting faults into a set of hosts. + */ +public interface HostFaultInjector : AutoCloseable { + /** + * Start fault injection. + */ + public fun start() + + /** + * Stop fault injection into the system. + */ + public override fun close() + + public companion object { + /** + * Construct a new [HostFaultInjector]. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The hosts to inject the faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + iat: RealDistribution, + selector: VictimSelector, + fault: HostFault + ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt new file mode 100644 index 00000000..fc7cebfc --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.time.Clock +import kotlin.math.roundToLong + +/** + * A type of [HostFault] where the hosts are stopped and recover after some random amount of time. + */ +public class StartStopHostFault(private val duration: RealDistribution) : HostFault { + override suspend fun apply(clock: Clock, victims: List<SimHost>) { + for (host in victims) { + host.fail() + } + + val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds + + // Handle long overflow + if (clock.millis() + df <= 0) { + return + } + + delay(df) + + for (host in victims) { + host.recover() + } + } + + override fun toString(): String = "StartStopHostFault[$duration]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt new file mode 100644 index 00000000..fcd9dd7e --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import java.util.* +import kotlin.math.roundToInt + +/** + * A [VictimSelector] that stochastically selects a set of hosts to be failed. + */ +public class StochasticVictimSelector( + private val size: RealDistribution, + private val random: Random = Random(0) +) : VictimSelector { + + override fun select(hosts: Set<SimHost>): List<SimHost> { + val n = size.sample().roundToInt() + return hosts.shuffled(random).take(n) + } + + override fun toString(): String = "StochasticVictimSelector[$size]" +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt new file mode 100644 index 00000000..b5610284 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import org.opendc.compute.simulator.SimHost + +/** + * Interface responsible for selecting the victim(s) for fault injection. + */ +public interface VictimSelector { + /** + * Select the hosts from [hosts] where a fault will be injected. + */ + public fun select(hosts: Set<SimHost>): List<SimHost> +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..5ea1860d --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,350 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.common.AttributesBuilder +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimVirtualMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = GuestAttributes(this) + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start deleted server" } + throw IllegalArgumentException("Server is deleted") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Delete the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun delete() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * Recover the guest if it is in an error state. + */ + suspend fun recover() { + if (state != ServerState.ERROR) { + return + } + + doStart() + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Converge the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = attributes.toBuilder() + .put(STATE_KEY, "up") + .build() + private val _downState = attributes.toBuilder() + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime and downtime of the guest. + */ + fun updateUptime(duration: Long) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + } + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(result: ObservableLongMeasurement) { + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement) { + if (_bootTime != Long.MIN_VALUE) { + result.observe(_bootTime) + } + } + + private val _activeState = attributes.toBuilder() + .put(STATE_KEY, "active") + .build() + private val _stealState = attributes.toBuilder() + .put(STATE_KEY, "steal") + .build() + private val _lostState = attributes.toBuilder() + .put(STATE_KEY, "lost") + .build() + private val _idleState = attributes.toBuilder() + .put(STATE_KEY, "idle") + .build() + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = machine.counters + + result.observe(counters.cpuActiveTime / 1000, _activeState) + result.observe(counters.cpuIdleTime / 1000, _idleState) + result.observe(counters.cpuStealTime / 1000, _stealState) + result.observe(counters.cpuLostTime / 1000, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } + + /** + * An optimized [Attributes] implementation. + */ + private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { + /** + * Construct a [GuestAttributes] instance from a [Guest]. + */ + constructor(guest: Guest) : this( + guest.server.uid.toString(), + Attributes.builder() + .put(ResourceAttributes.HOST_NAME, guest.server.name) + .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) + .build() + ) + + override fun <T : Any?> get(key: AttributeKey<T>): T? { + // Optimize access to the HOST_ID key which is accessed quite often + if (key == ResourceAttributes.HOST_ID) { + @Suppress("UNCHECKED_CAST") + return uid as T? + } + return attributes.get(key) + } + + override fun toBuilder(): AttributesBuilder { + val delegate = attributes.toBuilder() + return object : AttributesBuilder { + + override fun putAll(attributes: Attributes): AttributesBuilder { + delegate.putAll(attributes) + return this + } + + override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder { + delegate.put<T>(key, value) + return this + } + + override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder { + delegate.put(key, value) + return this + } + + override fun build(): Attributes = GuestAttributes(uid, delegate.build()) + } + } + + override fun equals(other: Any?): Boolean = attributes == other + + // Cache hash code + private val _hash = attributes.hashCode() + + override fun hashCode(): Int = _hash + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt new file mode 100644 index 00000000..7d46e626 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import kotlinx.coroutines.* +import org.apache.commons.math3.distribution.RealDistribution +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFault +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.VictimSelector +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * Internal implementation of the [HostFaultInjector] interface. + * + * @param context The scope to run the fault injector in. + * @param clock The [Clock] to keep track of simulation time. + * @param hosts The set of hosts to inject faults into. + * @param iat The inter-arrival time distribution of the failures (in hours). + * @param selector The [VictimSelector] to select the host victims. + * @param fault The type of [HostFault] to inject. + */ +internal class HostFaultInjectorImpl( + private val context: CoroutineContext, + private val clock: Clock, + private val hosts: Set<SimHost>, + private val iat: RealDistribution, + private val selector: VictimSelector, + private val fault: HostFault +) : HostFaultInjector { + /** + * The scope in which the injector runs. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * The [Job] that awaits the nearest fault in the system. + */ + private var job: Job? = null + + /** + * Start the fault injection into the system. + */ + override fun start() { + if (job != null) { + return + } + + job = scope.launch { + runInjector() + job = null + } + } + + /** + * Converge the injection process. + */ + private suspend fun runInjector() { + while (true) { + // Make sure to convert delay from hours to milliseconds + val d = (iat.sample() * 3.6e6).roundToLong() + + // Handle long overflow + if (clock.millis() + d <= 0) { + return + } + + delay(d) + + val victims = selector.select(hosts) + fault.apply(clock, victims) + } + } + + /** + * Stop the fault injector. + */ + public override fun close() { + scope.cancel() + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5594fd59..a0ff9228 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,47 +23,48 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.util.UUID +import java.time.Duration +import java.util.* import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var machineModel: SimMachineModel + private lateinit var machineModel: MachineModel @BeforeEach fun setUp() { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) - machineModel = SimMachineModel( + machineModel = MachineModel( cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) @@ -74,16 +75,31 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val engine = FlowEngine(coroutineContext, clock) + val virtDriver = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), @@ -91,12 +107,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), + offset = 1 ) ) ) @@ -106,12 +123,13 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2) - ) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) + ), + offset = 1 ) ) ) @@ -121,20 +139,14 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() - grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() - overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() - return CompletableResultCode.ofSuccess() + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000 + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -155,18 +167,124 @@ internal class SimHostTest { } // Ensure last cycle is collected - delay(1000 * duration) + delay(1000L * duration) virtDriver.close() reader.close() assertAll( - { assertEquals(4197600, requestedWork, "Requested work does not match") }, - { assertEquals(2157600, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(658, activeTime, "Active time does not match") }, + { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } + /** + * Test failure of the host. + */ + @Test + fun testFailure() = runBlockingSimulation { + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setResource(hostResource) + .setClock(clock.toOtelClock()) + .build() + + val engine = FlowEngine(coroutineContext, clock) + val host = SimHost( + uid = hostId, + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + engine, + meterProvider, + SimFairShareHypervisorProvider() + ) + val duration = 5 * 60L + val image = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) + ), + offset = 1 + ) + ) + ) + val flavor = MockFlavor(2, 0) + val server = MockServer(UUID.randomUUID(), "a", flavor, image) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } + + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime + } + }, + exportInterval = Duration.ofSeconds(duration) + ) + + coroutineScope { + host.spawn(server) + delay(5000L) + host.fail() + delay(duration * 1000) + host.recover() + + suspendCancellableCoroutine<Unit> { cont -> + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED) { + cont.resume(Unit) + } + } + }) + } + } + + host.close() + // Ensure last cycle is collected + delay(1000L * duration) + + reader.close() + + assertAll( + { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(624, activeTime, "Active time does not match") }, + { assertEquals(900001, uptime, "Uptime does not match") }, + { assertEquals(300000, downtime, "Downtime does not match") }, + { assertEquals(900000, guestUptime, "Guest uptime does not match") }, + { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + ) + } + private class MockFlavor( override val cpuCount: Int, override val memorySize: Long diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt new file mode 100644 index 00000000..f240a25f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.failure + +import io.mockk.coVerify +import io.mockk.mockk +import kotlinx.coroutines.delay +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.junit.jupiter.api.Test +import org.opendc.compute.simulator.SimHost +import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln + +/** + * Test suite for [HostFaultInjector] class. + */ +internal class HostFaultInjectorTest { + /** + * Simple test case to test that nothing happens when the injector is not started. + */ + @Test + fun testInjectorNotStarted() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + coVerify(exactly = 0) { host.fail() } + coVerify(exactly = 0) { host.recover() } + + injector.close() + } + + /** + * Simple test case to test a start stop fault where the machine is stopped and started after some time. + */ + @Test + fun testInjectorStopsMachine() = runBlockingSimulation { + val host = mockk<SimHost>(relaxUnitFun = true) + + val injector = createSimpleInjector(coroutineContext, clock, setOf(host)) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { host.fail() } + coVerify(exactly = 1) { host.recover() } + } + + /** + * Simple test case to test a start stop fault where multiple machines are stopped. + */ + @Test + fun testInjectorStopsMultipleMachines() = runBlockingSimulation { + val hosts = listOf<SimHost>( + mockk(relaxUnitFun = true), + mockk(relaxUnitFun = true) + ) + + val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet()) + + injector.start() + + delay(Duration.ofDays(55).toMillis()) + + injector.close() + + coVerify(exactly = 1) { hosts[0].fail() } + coVerify(exactly = 1) { hosts[1].fail() } + coVerify(exactly = 1) { hosts[0].recover() } + coVerify(exactly = 1) { hosts[1].recover() } + } + + /** + * Create a simple start stop fault injector. + */ + private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector { + val rng = Well19937c(0) + val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03) + val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25)) + val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + + return HostFaultInjector(context, clock, hosts, iat, selector, fault) + } +} |
