summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-simulator')
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt504
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt305
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt117
5 files changed, 637 insertions, 328 deletions
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index cad051e6..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -40,5 +40,6 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index be6ef11e..793db907 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -26,13 +26,16 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
@@ -44,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -100,32 +103,29 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
+ private val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- _totalWork.add(requestedWork)
- _grantedWork.add(grantedWork)
- _overcommittedWork.add(overcommittedWork)
- _interferedWork.add(interferedWork)
- _cpuDemand.record(cpuDemand)
- _cpuUsage.record(cpuUsage)
- _powerUsage.record(machine.powerDraw)
-
- reportTime()
+ _cpuDemand = cpuDemand
+ _cpuUsage = cpuUsage
+
+ collectTime()
}
}
)
+ private var _cpuUsage = 0.0
+ private var _cpuDemand = 0.0
/**
* The virtual machines running on the hypervisor.
@@ -145,109 +145,23 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The total number of guests.
- */
- private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Total number of guests")
- .setUnit("1")
- .build()
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.upDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
-
- /**
- * The CPU demand of the host.
- */
- private val _cpuDemand = meter.histogramBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
-
- /**
- * The CPU usage of the host.
- */
- private val _cpuUsage = meter.histogramBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
-
- /**
- * The power usage of the host.
- */
- private val _powerUsage = meter.histogramBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
-
- /**
- * The total amount of work supplied to the CPU.
- */
- private val _totalWork = meter.counterBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The work performed by the CPU.
- */
- private val _grantedWork = meter.counterBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount not performed by the CPU due to overcommitment.
- */
- private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount of work not performed by the CPU due to interference.
- */
- private val _interferedWork = meter.counterBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .ofDoubles()
- .build()
-
- /**
- * The amount of time in the system.
+ * The [GuestListener] that listens for guest events.
*/
- private val _totalTime = meter.counterBuilder("host.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
-
- /**
- * The uptime of the host.
- */
- private val _upTime = meter.counterBuilder("host.time.up")
- .setDescription("The uptime of the host")
- .setUnit("ms")
- .build()
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The downtime of the host.
- */
- private val _downTime = meter.counterBuilder("host.time.down")
- .setDescription("The downtime of the host")
- .setUnit("ms")
- .build()
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
init {
// Launch hypervisor onto machine
scope.launch {
try {
+ _bootTime = clock.millis()
_state = HostState.UP
machine.run(this@SimHost.hypervisor, emptyMap())
} catch (_: CancellationException) {
@@ -259,29 +173,48 @@ public class SimHost(
_state = HostState.DOWN
}
}
- }
-
- private var _lastReport = clock.millis()
-
- private fun reportTime() {
- if (!scope.isActive)
- return
-
- val now = clock.millis()
- val duration = now - _lastReport
-
- _totalTime.add(duration)
- when (_state) {
- HostState.UP -> _upTime.add(duration)
- HostState.DOWN -> _downTime.add(duration)
- }
- // Track time of guests
- for (guest in guests.values) {
- guest.reportTime()
- }
-
- _lastReport = now
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback(::collectPowerTotal)
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectTime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
@@ -295,8 +228,17 @@ public class SimHost(
override suspend fun spawn(server: Server, start: Boolean) {
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- _guests.add(1)
- Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
}
if (start) {
@@ -320,7 +262,6 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- _guests.add(-1)
guest.terminate()
}
@@ -333,7 +274,7 @@ public class SimHost(
}
override fun close() {
- reportTime()
+ reset()
scope.cancel()
machine.close()
}
@@ -341,22 +282,35 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
+ reset()
+
for (guest in guests.values) {
guest.fail()
}
}
public suspend fun recover() {
- reportTime()
+ collectTime()
_state = HostState.UP
+ _bootTime = clock.millis()
+
for (guest in guests.values) {
guest.start()
}
}
/**
+ * Reset the machine.
+ */
+ private fun reset() {
+ collectTime()
+
+ _state = HostState.DOWN
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ }
+
+ /**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
@@ -368,162 +322,168 @@ public class SimHost(
return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStart(vm: Guest) {
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val STATE_KEY = AttributeKey.stringKey("state")
- private fun onGuestStop(vm: Guest) {
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to collect the guest counts on this host.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- var state: ServerState = ServerState.TERMINATED
-
- /**
- * The attributes of the guest.
- */
- val attributes: Attributes = Attributes.builder()
- .put(ResourceAttributes.HOST_NAME, server.name)
- .put(ResourceAttributes.HOST_ID, server.uid.toString())
- .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
- .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
- .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
- .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
- .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
- .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
- .build()
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("guest.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- /**
- * The uptime of the guest.
- */
- private val _runningTime = meter.counterBuilder("guest.time.running")
- .setDescription("The uptime of the guest")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- /**
- * The time the guest is in an error state.
- */
- private val _errorTime = meter.counterBuilder("guest.time.error")
- .setDescription("The time the guest is in an error state")
- .setUnit("ms")
- .build()
- .bind(attributes)
-
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ for ((_, guest) in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ else -> invalid++
}
}
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
+ }
- suspend fun terminate() {
- stop()
- machine.close()
- state = ServerState.DELETED
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
+
+ for (guest in guests.values) {
+ guest.collectCpuLimit(result)
}
+ }
- suspend fun fail() {
- if (state != ServerState.RUNNING) {
- return
- }
- stop()
- state = ServerState.ERROR
+ private var _lastCpuTimeCallback = clock.millis()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastCpuTimeCallback
+
+ try {
+ collectCpuTime(duration, result)
+ } finally {
+ _lastCpuTimeCallback = now
}
+ }
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- try {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- job = null
- }
- }
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = this.model.cpuCount
+ val d = coreCount / _cpuLimit
+
+ val counters = hypervisor.counters
+ val grantedWork = counters.actual
+ val overcommittedWork = counters.overcommit
+ val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+ val lostTime = (interferedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(lostTime, _lostState)
+
+ for (guest in guests.values) {
+ guest.collectCpuTime(duration, result)
}
+ }
+
+ private var _lastPowerCallback = clock.millis()
+ private var _totalPower = 0.0
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ /**
+ * Helper function to collect the total power usage of the machine.
+ */
+ private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastPowerCallback
+
+ _totalPower += duration / 1000.0 * machine.powerDraw
+ result.observe(_totalPower)
+
+ _lastPowerCallback = now
+ }
+
+ private var _lastReport = clock.millis()
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(result: ObservableLongMeasurement? = null) {
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ try {
+ collectTime(duration, result)
+ } finally {
+ _lastReport = now
}
+ }
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- onGuestStop(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == HostState.UP) {
+ _uptime += duration
+ } else if (state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var _lastReport = clock.millis()
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
- fun reportTime() {
- if (state == ServerState.DELETED)
- return
+ for (guest in guests.values) {
+ guest.collectUptime(duration, result)
+ }
+ }
- val now = clock.millis()
- val duration = now - _lastReport
+ private var _bootTime = Long.MIN_VALUE
- _totalTime.add(duration)
- when (state) {
- ServerState.RUNNING -> _runningTime.add(duration)
- ServerState.ERROR -> _errorTime.add(duration)
- else -> {}
- }
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
- _lastReport = now
+ for (guest in guests.values) {
+ guest.collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..90562e2f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimAbstractMachine
+import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Terminate the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun terminate() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Run the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "idle")
+ .build()
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = server.flavor.cpuCount
+ val d = coreCount / _cpuLimit
+
+ var grantedWork = 0.0
+ var overcommittedWork = 0.0
+
+ for (cpu in (machine as SimAbstractMachine).cpus) {
+ val counters = cpu.counters
+ grantedWork += counters.actual
+ overcommittedWork += counters.overcommit
+ }
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(0, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 318b5a5d..9c879e5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,11 +23,9 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -44,17 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
-import kotlin.math.roundToLong
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
private lateinit var machineModel: MachineModel
@@ -73,18 +74,23 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var totalWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val virtDriver = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
@@ -132,20 +138,16 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
- grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
- overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong()
- return CompletableResultCode.ofSuccess()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
+ }
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
+ ),
exportInterval = Duration.ofSeconds(duration)
)
@@ -172,9 +174,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, totalWork, "Requested work does not match") },
- { assertEquals(2107200, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(659, activeTime, "Active time does not match") },
+ { assertEquals(2342, idleTime, "Idle time does not match") },
+ { assertEquals(638, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -184,21 +186,26 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var totalWork = 0L
- var grantedWork = 0L
- var totalTime = 0L
- var downTime = 0L
- var guestTotalTime = 0L
- var guestDownTime = 0L
-
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val host = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
@@ -230,24 +237,22 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong()
- grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong()
- totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value
- downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value
- guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value
- guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value
-
- return CompletableResultCode.ofSuccess()
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
+ }
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
+ }
+ }
+ ),
exportInterval = Duration.ofSeconds(duration)
)
@@ -276,12 +281,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226040, totalWork, "Total time does not match") },
- { assertEquals(1086040, grantedWork, "Down time does not match") },
- { assertEquals(1200001, totalTime, "Total time does not match") },
- { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
- { assertEquals(5000, downTime, "Down time does not match") },
- { assertEquals(5000, guestDownTime, "Guest down time does not match") },
+ { assertEquals(2661, idleTime, "Idle time does not match") },
+ { assertEquals(339, activeTime, "Active time does not match") },
+ { assertEquals(1195001, uptime, "Uptime does not match") },
+ { assertEquals(5000, downtime, "Downtime does not match") },
+ { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
)
}