summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-simulator/src')
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt543
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt36
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt55
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt44
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt35
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt350
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt103
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt208
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt111
11 files changed, 1299 insertions, 289 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 68667a8c..b9d02185 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,29 +22,34 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.ScalingDriver
-import org.opendc.simulator.compute.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.kernel.SimHypervisor
+import org.opendc.simulator.compute.kernel.SimHypervisorProvider
+import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.power.PowerModel
-import org.opendc.simulator.failures.FailureDomain
-import java.time.Clock
+import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -52,34 +57,27 @@ import kotlin.coroutines.resume
public class SimHost(
override val uid: UUID,
override val name: String,
- model: SimMachineModel,
+ model: MachineModel,
override val meta: Map<String, Any>,
context: CoroutineContext,
- clock: Clock,
- meter: Meter,
- hypervisor: SimHypervisorProvider,
- scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver,
+ engine: FlowEngine,
+ meterProvider: MeterProvider,
+ hypervisorProvider: SimHypervisorProvider,
+ scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
+ powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
-) : Host, FailureDomain, AutoCloseable {
-
- public constructor(
- uid: UUID,
- name: String,
- model: SimMachineModel,
- meta: Map<String, Any>,
- context: CoroutineContext,
- clock: Clock,
- meter: Meter,
- hypervisor: SimHypervisorProvider,
- powerModel: PowerModel = ConstantPowerModel(0.0),
- mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
- ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper)
-
+ interferenceDomain: VmInterferenceDomain? = null,
+ private val optimize: Boolean = false
+) : Host, AutoCloseable {
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context + Job())
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The clock instance used by the host.
+ */
+ private val clock = engine.clock
/**
* The logger instance of this server.
@@ -87,52 +85,31 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
- * The event listeners registered with this host.
+ * The [Meter] to track metrics of the simulated host.
*/
- private val listeners = mutableListOf<HostListener>()
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
/**
- * Current total memory use of the images on this hypervisor.
+ * The event listeners registered with this host.
*/
- private var availableMemory: Long = model.memory.map { it.size }.sum()
+ private val listeners = mutableListOf<HostListener>()
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver)
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
- scope.coroutineContext, clock,
- object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
-
- _batch.put(_cpuWork, requestedWork.toDouble())
- _batch.put(_cpuWorkGranted, grantedWork.toDouble())
- _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble())
- _batch.put(_cpuWorkInterference, interferedWork.toDouble())
- _batch.put(_cpuUsage, cpuUsage)
- _batch.put(_cpuDemand, cpuDemand)
- _batch.put(_cpuPower, machine.powerDraw)
- _batch.record()
- }
- }
- )
+ private val hypervisor: SimHypervisor = hypervisorProvider
+ .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
/**
* The virtual machines running on the hypervisor.
*/
private val guests = HashMap<Server, Guest>()
+ private val _guests = mutableListOf<Guest>()
override val state: HostState
get() = _state
@@ -144,123 +121,99 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
-
- /**
- * The number of guests on the host.
- */
- private val _guests = meter.longUpDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
- .setUnit("1")
- .build()
- .bind(Labels.of("host", uid.toString()))
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
- .bind(Labels.of("host", uid.toString()))
-
- /**
- * The CPU usage on the host.
- */
- private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
-
- /**
- * The CPU demand on the host.
- */
- private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
-
- /**
- * The requested work for the CPU.
- */
- private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
-
- /**
- * The requested work for the CPU.
- */
- private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .build()
-
- /**
- * The work actually performed by the CPU.
- */
- private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .build()
+ override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The work that could not be performed by the CPU due to overcommitting resource.
+ * The [GuestListener] that listens for guest events.
*/
- private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .build()
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The work that could not be performed by the CPU due to interference.
- */
- private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .build()
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
/**
- * The batch recorder used to record multiple metrics atomically.
+ * The [Job] that represents the machine running the hypervisor.
*/
- private val _batch = meter.newBatchRecorder("host", uid.toString())
+ private var _job: Job? = null
init {
- // Launch hypervisor onto machine
- scope.launch {
- try {
- _state = HostState.UP
- machine.run(this@SimHost.hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
- }
- }
+ launch()
+
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerUsage) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback { result -> result.observe(machine.energyUsage) }
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectUptime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
- val sufficientMemory = availableMemory > server.flavor.memorySize
- val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount
+ val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
return sufficientMemory && enoughCpus && canFit
}
override suspend fun spawn(server: Server, start: Boolean) {
- // Return if the server already exists on this host
- if (server in this) {
- return
+ val guest = guests.computeIfAbsent(server) { key ->
+ require(canFit(key)) { "Server does not fit" }
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ val newGuest = Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
+
+ _guests.add(newGuest)
+ newGuest
}
- require(canFit(server)) { "Server does not fit" }
- val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
- guests[server] = guest
- _guests.add(1)
-
if (start) {
guest.start()
}
@@ -281,9 +234,8 @@ public class SimHost(
}
override suspend fun delete(server: Server) {
- val guest = guests.remove(server) ?: return
- guest.terminate()
- _guests.add(-1)
+ val guest = guests[server] ?: return
+ guest.delete()
}
override fun addListener(listener: HostListener) {
@@ -295,130 +247,233 @@ public class SimHost(
}
override fun close() {
+ reset()
scope.cancel()
machine.close()
}
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+ public suspend fun fail() {
+ reset()
+
+ for (guest in _guests) {
+ guest.fail()
+ }
+ }
+
+ public suspend fun recover() {
+ updateUptime()
+
+ launch()
+
+ // Wait for the hypervisor to launch before recovering the guests
+ yield()
+
+ for (guest in _guests) {
+ guest.recover()
+ }
+ }
+
+ /**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ check(_job == null) { "Concurrent hypervisor running" }
+
+ // Launch hypervisor onto machine
+ _job = scope.launch {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ machine.run(hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
+ /**
+ * Reset the machine.
+ */
+ private fun reset() {
+ updateUptime()
+
+ // Stop the hypervisor
+ val job = _job
+ if (job != null) {
+ job.cancel()
+ _job = null
+ }
+
+ _state = HostState.DOWN
+ }
+
/**
* Convert flavor to machine model.
*/
- private fun Flavor.toMachineModel(): SimMachineModel {
+ private fun Flavor.toMachineModel(): MachineModel {
val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
- return SimMachineModel(processingUnits, memoryUnits)
+ return MachineModel(processingUnits, memoryUnits).optimize()
}
- private fun onGuestStart(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStart(vm.server.image.name)
- }
+ /**
+ * Optimize the [MachineModel] for simulation.
+ */
+ private fun MachineModel.optimize(): MachineModel {
+ if (!optimize) {
+ return this
}
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+ val originalCpu = cpus[0]
+ val freq = cpus.sumOf { it.frequency }
+ val processingNode = originalCpu.node.copy(coreCount = 1)
+ val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode))
+
+ val memorySize = memory.sumOf { it.size }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStop(vm: Guest) {
- guests.forEach { (_, guest) ->
- if (guest.state == ServerState.RUNNING) {
- vm.performanceInterferenceModel?.onStop(vm.server.image.name)
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
+
+ /**
+ * Helper function to collect the guest counts on this host.
+ */
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ val guests = _guests.listIterator()
+ for (guest in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ ServerState.DELETED -> {
+ // Remove guests that have been deleted
+ this.guests.remove(guest.server)
+ guests.remove()
+ }
+ else -> invalid++
}
}
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
}
- override suspend fun fail() {
- _state = HostState.DOWN
- }
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
- override suspend fun recover() {
- _state = HostState.UP
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuLimit(result)
+ }
}
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to track the CPU time of a machine.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = hypervisor.counters
- var state: ServerState = ServerState.TERMINATED
+ result.observe(counters.cpuActiveTime / 1000L, _activeState)
+ result.observe(counters.cpuIdleTime / 1000L, _idleState)
+ result.observe(counters.cpuStealTime / 1000L, _stealState)
+ result.observe(counters.cpuLostTime / 1000L, _lostState)
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
- }
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectCpuTime(result)
}
+ }
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ private var _lastReport = clock.millis()
- suspend fun terminate() {
- stop()
- state = ServerState.DELETED
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun updateUptime() {
+ val now = clock.millis()
+ val duration = now - _lastReport
+ _lastReport = now
+
+ if (_state == HostState.UP) {
+ _uptime += duration
+ } else if (_state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- machine.close()
- job = null
- }
- }
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].updateUptime(duration)
}
+ }
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
+
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
+
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectUptime(result)
}
+ }
+
+ private var _bootTime = Long.MIN_VALUE
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result.observe(_bootTime)
+ }
- availableMemory += server.flavor.memorySize
- onGuestStop(this)
+ val guests = _guests
+ for (i in guests.indices) {
+ guests[i].collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
new file mode 100644
index 00000000..258ccc89
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFault.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.opendc.compute.simulator.SimHost
+import java.time.Clock
+
+/**
+ * Interface responsible for applying the fault to a host.
+ */
+public interface HostFault {
+ /**
+ * Apply the fault to the specified [victims].
+ */
+ public suspend fun apply(clock: Clock, victims: List<SimHost>)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
new file mode 100644
index 00000000..5eff439f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/HostFaultInjector.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.internal.HostFaultInjectorImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * An interface for stochastically injecting faults into a set of hosts.
+ */
+public interface HostFaultInjector : AutoCloseable {
+ /**
+ * Start fault injection.
+ */
+ public fun start()
+
+ /**
+ * Stop fault injection into the system.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [HostFaultInjector].
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The hosts to inject the faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ hosts: Set<SimHost>,
+ iat: RealDistribution,
+ selector: VictimSelector,
+ fault: HostFault
+ ): HostFaultInjector = HostFaultInjectorImpl(context, clock, hosts, iat, selector, fault)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
new file mode 100644
index 00000000..fc7cebfc
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StartStopHostFault.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import kotlinx.coroutines.delay
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import java.time.Clock
+import kotlin.math.roundToLong
+
+/**
+ * A type of [HostFault] where the hosts are stopped and recover after some random amount of time.
+ */
+public class StartStopHostFault(private val duration: RealDistribution) : HostFault {
+ override suspend fun apply(clock: Clock, victims: List<SimHost>) {
+ for (host in victims) {
+ host.fail()
+ }
+
+ val df = (duration.sample() * 1000).roundToLong() // seconds to milliseconds
+
+ // Handle long overflow
+ if (clock.millis() + df <= 0) {
+ return
+ }
+
+ delay(df)
+
+ for (host in victims) {
+ host.recover()
+ }
+ }
+
+ override fun toString(): String = "StartStopHostFault[$duration]"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
new file mode 100644
index 00000000..fcd9dd7e
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import java.util.*
+import kotlin.math.roundToInt
+
+/**
+ * A [VictimSelector] that stochastically selects a set of hosts to be failed.
+ */
+public class StochasticVictimSelector(
+ private val size: RealDistribution,
+ private val random: Random = Random(0)
+) : VictimSelector {
+
+ override fun select(hosts: Set<SimHost>): List<SimHost> {
+ val n = size.sample().roundToInt()
+ return hosts.shuffled(random).take(n)
+ }
+
+ override fun toString(): String = "StochasticVictimSelector[$size]"
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
new file mode 100644
index 00000000..b5610284
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/VictimSelector.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import org.opendc.compute.simulator.SimHost
+
+/**
+ * Interface responsible for selecting the victim(s) for fault injection.
+ */
+public interface VictimSelector {
+ /**
+ * Select the hosts from [hosts] where a fault will be injected.
+ */
+ public fun select(hosts: Set<SimHost>): List<SimHost>
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..5ea1860d
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,350 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.common.AttributesBuilder
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.kernel.SimVirtualMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimVirtualMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = GuestAttributes(this)
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start deleted server" }
+ throw IllegalArgumentException("Server is deleted")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Delete the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun delete() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * Recover the guest if it is in an error state.
+ */
+ suspend fun recover() {
+ if (state != ServerState.ERROR) {
+ return
+ }
+
+ doStart()
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Converge the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = attributes.toBuilder()
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = attributes.toBuilder()
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime and downtime of the guest.
+ */
+ fun updateUptime(duration: Long) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+ }
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(result: ObservableLongMeasurement) {
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = attributes.toBuilder()
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = attributes.toBuilder()
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = attributes.toBuilder()
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = attributes.toBuilder()
+ .put(STATE_KEY, "idle")
+ .build()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = machine.counters
+
+ result.observe(counters.cpuActiveTime / 1000, _activeState)
+ result.observe(counters.cpuIdleTime / 1000, _idleState)
+ result.observe(counters.cpuStealTime / 1000, _stealState)
+ result.observe(counters.cpuLostTime / 1000, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+
+ /**
+ * An optimized [Attributes] implementation.
+ */
+ private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes {
+ /**
+ * Construct a [GuestAttributes] instance from a [Guest].
+ */
+ constructor(guest: Guest) : this(
+ guest.server.uid.toString(),
+ Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, guest.server.name)
+ .put(ResourceAttributes.HOST_ID, guest.server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString())
+ .build()
+ )
+
+ override fun <T : Any?> get(key: AttributeKey<T>): T? {
+ // Optimize access to the HOST_ID key which is accessed quite often
+ if (key == ResourceAttributes.HOST_ID) {
+ @Suppress("UNCHECKED_CAST")
+ return uid as T?
+ }
+ return attributes.get(key)
+ }
+
+ override fun toBuilder(): AttributesBuilder {
+ val delegate = attributes.toBuilder()
+ return object : AttributesBuilder {
+
+ override fun putAll(attributes: Attributes): AttributesBuilder {
+ delegate.putAll(attributes)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder {
+ delegate.put<T>(key, value)
+ return this
+ }
+
+ override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder {
+ delegate.put(key, value)
+ return this
+ }
+
+ override fun build(): Attributes = GuestAttributes(uid, delegate.build())
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = attributes == other
+
+ // Cache hash code
+ private val _hash = attributes.hashCode()
+
+ override fun hashCode(): Int = _hash
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
new file mode 100644
index 00000000..7d46e626
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import kotlinx.coroutines.*
+import org.apache.commons.math3.distribution.RealDistribution
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFault
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.VictimSelector
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * Internal implementation of the [HostFaultInjector] interface.
+ *
+ * @param context The scope to run the fault injector in.
+ * @param clock The [Clock] to keep track of simulation time.
+ * @param hosts The set of hosts to inject faults into.
+ * @param iat The inter-arrival time distribution of the failures (in hours).
+ * @param selector The [VictimSelector] to select the host victims.
+ * @param fault The type of [HostFault] to inject.
+ */
+internal class HostFaultInjectorImpl(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val hosts: Set<SimHost>,
+ private val iat: RealDistribution,
+ private val selector: VictimSelector,
+ private val fault: HostFault
+) : HostFaultInjector {
+ /**
+ * The scope in which the injector runs.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ /**
+ * The [Job] that awaits the nearest fault in the system.
+ */
+ private var job: Job? = null
+
+ /**
+ * Start the fault injection into the system.
+ */
+ override fun start() {
+ if (job != null) {
+ return
+ }
+
+ job = scope.launch {
+ runInjector()
+ job = null
+ }
+ }
+
+ /**
+ * Converge the injection process.
+ */
+ private suspend fun runInjector() {
+ while (true) {
+ // Make sure to convert delay from hours to milliseconds
+ val d = (iat.sample() * 3.6e6).roundToLong()
+
+ // Handle long overflow
+ if (clock.millis() + d <= 0) {
+ return
+ }
+
+ delay(d)
+
+ val victims = selector.select(hosts)
+ fault.apply(clock, victims)
+ }
+ }
+
+ /**
+ * Stop the fault injector.
+ */
+ public override fun close() {
+ scope.cancel()
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 5594fd59..a0ff9228 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,47 +23,48 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
-import org.opendc.simulator.compute.SimFairShareHypervisorProvider
-import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
-import java.util.UUID
+import java.time.Duration
+import java.util.*
import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var machineModel: SimMachineModel
+ private lateinit var machineModel: MachineModel
@BeforeEach
fun setUp() {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
- machineModel = SimMachineModel(
+ machineModel = MachineModel(
cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
@@ -74,16 +75,31 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val engine = FlowEngine(coroutineContext, clock)
+ val virtDriver = SimHost(
+ uid = hostId,
+ name = "test",
+ model = machineModel,
+ meta = emptyMap(),
+ coroutineContext,
+ engine,
+ meterProvider,
+ SimFairShareHypervisorProvider()
+ )
val duration = 5 * 60L
val vmImageA = MockImage(
UUID.randomUUID(),
@@ -91,12 +107,13 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 183.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2)
),
+ offset = 1
)
)
)
@@ -106,12 +123,13 @@ internal class SimHostTest {
emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 2 * 73.0, 2)
- )
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2)
+ ),
+ offset = 1
)
)
)
@@ -121,20 +139,14 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
- grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
- overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
- return CompletableResultCode.ofSuccess()
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
},
- exportInterval = duration * 1000
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -155,18 +167,124 @@ internal class SimHostTest {
}
// Ensure last cycle is collected
- delay(1000 * duration)
+ delay(1000L * duration)
virtDriver.close()
reader.close()
assertAll(
- { assertEquals(4197600, requestedWork, "Requested work does not match") },
- { assertEquals(2157600, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(658, activeTime, "Active time does not match") },
+ { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
+ /**
+ * Test failure of the host.
+ */
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setResource(hostResource)
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val engine = FlowEngine(coroutineContext, clock)
+ val host = SimHost(
+ uid = hostId,
+ name = "test",
+ model = machineModel,
+ meta = emptyMap(),
+ coroutineContext,
+ engine,
+ meterProvider,
+ SimFairShareHypervisorProvider()
+ )
+ val duration = 5 * 60L
+ val image = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 2 * 28.0, 2),
+ SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2),
+ SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2),
+ SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2)
+ ),
+ offset = 1
+ )
+ )
+ )
+ val flavor = MockFlavor(2, 0)
+ val server = MockServer(UUID.randomUUID(), "a", flavor, image)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
+ }
+
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
+ }
+ },
+ exportInterval = Duration.ofSeconds(duration)
+ )
+
+ coroutineScope {
+ host.spawn(server)
+ delay(5000L)
+ host.fail()
+ delay(duration * 1000)
+ host.recover()
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
+ }
+
+ host.close()
+ // Ensure last cycle is collected
+ delay(1000L * duration)
+
+ reader.close()
+
+ assertAll(
+ { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(624, activeTime, "Active time does not match") },
+ { assertEquals(900001, uptime, "Uptime does not match") },
+ { assertEquals(300000, downtime, "Downtime does not match") },
+ { assertEquals(900000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
+ )
+ }
+
private class MockFlavor(
override val cpuCount: Int,
override val memorySize: Long
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
new file mode 100644
index 00000000..f240a25f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/failure/HostFaultInjectorTest.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.failure
+
+import io.mockk.coVerify
+import io.mockk.mockk
+import kotlinx.coroutines.delay
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
+import org.junit.jupiter.api.Test
+import org.opendc.compute.simulator.SimHost
+import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import java.time.Duration
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.ln
+
+/**
+ * Test suite for [HostFaultInjector] class.
+ */
+internal class HostFaultInjectorTest {
+ /**
+ * Simple test case to test that nothing happens when the injector is not started.
+ */
+ @Test
+ fun testInjectorNotStarted() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ coVerify(exactly = 0) { host.fail() }
+ coVerify(exactly = 0) { host.recover() }
+
+ injector.close()
+ }
+
+ /**
+ * Simple test case to test a start stop fault where the machine is stopped and started after some time.
+ */
+ @Test
+ fun testInjectorStopsMachine() = runBlockingSimulation {
+ val host = mockk<SimHost>(relaxUnitFun = true)
+
+ val injector = createSimpleInjector(coroutineContext, clock, setOf(host))
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { host.fail() }
+ coVerify(exactly = 1) { host.recover() }
+ }
+
+ /**
+ * Simple test case to test a start stop fault where multiple machines are stopped.
+ */
+ @Test
+ fun testInjectorStopsMultipleMachines() = runBlockingSimulation {
+ val hosts = listOf<SimHost>(
+ mockk(relaxUnitFun = true),
+ mockk(relaxUnitFun = true)
+ )
+
+ val injector = createSimpleInjector(coroutineContext, clock, hosts.toSet())
+
+ injector.start()
+
+ delay(Duration.ofDays(55).toMillis())
+
+ injector.close()
+
+ coVerify(exactly = 1) { hosts[0].fail() }
+ coVerify(exactly = 1) { hosts[1].fail() }
+ coVerify(exactly = 1) { hosts[0].recover() }
+ coVerify(exactly = 1) { hosts[1].recover() }
+ }
+
+ /**
+ * Create a simple start stop fault injector.
+ */
+ private fun createSimpleInjector(context: CoroutineContext, clock: Clock, hosts: Set<SimHost>): HostFaultInjector {
+ val rng = Well19937c(0)
+ val iat = LogNormalDistribution(rng, ln(24 * 7.0), 1.03)
+ val selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25))
+ val fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
+
+ return HostFaultInjector(context, clock, hosts, iat, selector, fault)
+ }
+}