summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 17:12:20 +0200
committerGitHub <noreply@github.com>2021-09-17 17:12:20 +0200
commitc1b9719aad10566c9d17f9eb757236c58a602b89 (patch)
tree2755ea2d44256116e6dc08a57a64b37a36331249 /opendc-compute
parent2cd3bd18e548a72d64afe0e7f59487f4747d722f (diff)
parente2537c59bef0645b948e92553cc5a42a8c0f7256 (diff)
merge: Standardize simulator metrics
This pull request standardizes the metrics emitted by the simulator based on OpenTelemetry conventions. From now on, all metrics exposed by the simulator are exported through OpenTelemetry following the recommended practices for naming, collection, etc. **Implementation Notes** - Improve ParquetDataWriter implementation - Simplify CoroutineMetricReader - Create separate MeterProvider per service/host - Standardize compute scheduler metrics - Standardize SimHost metrics - Use logical types for Parquet output columns **External Dependencies** - Update to OpenTelemetry 1.6.0 **Breaking API Changes** - Instead of supplying a `Meter` instances, key classes are now responsible for constructing a `Meter` instance from the supplied `MeterProvider`. - Export format has been changed to suit the outputted metrics - Energy experiments shell has been removed
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt159
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt81
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt509
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt305
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt134
10 files changed, 795 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f1c055d4..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,9 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -35,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -42,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -63,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -106,69 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.counterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.counterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.upDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _runningServers = meter.upDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that have finished running.
+ * The number of scheduling attempts.
*/
- private val _finishedServers = meter.counterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of hosts registered at the compute service.
+ * The response time of the service.
*/
- private val _hostCount = meter.upDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of available hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _availableHostCount = meter.upDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
-
- /**
- * The response time of the service.
- */
- private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
- .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
- .ofLongs()
- .setUnit("ms")
- .build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -181,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -308,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -338,8 +327,7 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -365,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -385,7 +375,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -397,10 +387,10 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
server.state = ServerState.TERMINATED
continue
@@ -413,8 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
- _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -429,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -453,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -488,16 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.ERROR) {
- _runningServers.add(-1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index d036ec00..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -61,8 +61,7 @@ internal class ComputeServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 28fd8217..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,8 +99,8 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,8 +158,8 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,8 +221,8 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index cad051e6..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -40,5 +40,6 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index a1cc3390..793db907 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -25,13 +25,17 @@ package org.opendc.compute.simulator
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
@@ -43,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -59,7 +63,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- private val meter: Meter,
+ meterProvider: MeterProvider,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -82,6 +86,11 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the simulated host.
+ */
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
+
+ /**
* The event listeners registered with this host.
*/
private val listeners = mutableListOf<HostListener>()
@@ -94,32 +103,29 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
+ private val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- _totalWork.add(requestedWork)
- _grantedWork.add(grantedWork)
- _overcommittedWork.add(overcommittedWork)
- _interferedWork.add(interferedWork)
- _cpuDemand.record(cpuDemand)
- _cpuUsage.record(cpuUsage)
- _powerUsage.record(machine.powerDraw)
-
- reportTime()
+ _cpuDemand = cpuDemand
+ _cpuUsage = cpuUsage
+
+ collectTime()
}
}
)
+ private var _cpuUsage = 0.0
+ private var _cpuDemand = 0.0
/**
* The virtual machines running on the hypervisor.
@@ -139,121 +145,23 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The total number of guests.
+ * The [GuestListener] that listens for guest events.
*/
- private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.upDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU demand of the host.
- */
- private val _cpuDemand = meter.histogramBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU usage of the host.
- */
- private val _cpuUsage = meter.histogramBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The power usage of the host.
- */
- private val _powerUsage = meter.histogramBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The total amount of work supplied to the CPU.
- */
- private val _totalWork = meter.counterBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The work performed by the CPU.
- */
- private val _grantedWork = meter.counterBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount not performed by the CPU due to overcommitment.
- */
- private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of work not performed by the CPU due to interference.
- */
- private val _interferedWork = meter.counterBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("host.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The uptime of the host.
- */
- private val _upTime = meter.counterBuilder("host.time.up")
- .setDescription("The uptime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The downtime of the host.
- */
- private val _downTime = meter.counterBuilder("host.time.down")
- .setDescription("The downtime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
init {
// Launch hypervisor onto machine
scope.launch {
try {
+ _bootTime = clock.millis()
_state = HostState.UP
machine.run(this@SimHost.hypervisor, emptyMap())
} catch (_: CancellationException) {
@@ -265,29 +173,48 @@ public class SimHost(
_state = HostState.DOWN
}
}
- }
-
- private var _lastReport = clock.millis()
-
- private fun reportTime() {
- if (!scope.isActive)
- return
-
- val now = clock.millis()
- val duration = now - _lastReport
-
- _totalTime.add(duration)
- when (_state) {
- HostState.UP -> _upTime.add(duration)
- HostState.DOWN -> _downTime.add(duration)
- }
-
- // Track time of guests
- for (guest in guests.values) {
- guest.reportTime()
- }
- _lastReport = now
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback(::collectPowerTotal)
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectTime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
@@ -301,8 +228,17 @@ public class SimHost(
override suspend fun spawn(server: Server, start: Boolean) {
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- _guests.add(1)
- Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
}
if (start) {
@@ -326,7 +262,6 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- _guests.add(-1)
guest.terminate()
}
@@ -339,7 +274,7 @@ public class SimHost(
}
override fun close() {
- reportTime()
+ reset()
scope.cancel()
machine.close()
}
@@ -347,22 +282,35 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
+ reset()
+
for (guest in guests.values) {
guest.fail()
}
}
public suspend fun recover() {
- reportTime()
+ collectTime()
_state = HostState.UP
+ _bootTime = clock.millis()
+
for (guest in guests.values) {
guest.start()
}
}
/**
+ * Reset the machine.
+ */
+ private fun reset() {
+ collectTime()
+
+ _state = HostState.DOWN
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ }
+
+ /**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
@@ -374,147 +322,168 @@ public class SimHost(
return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStart(vm: Guest) {
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val STATE_KEY = AttributeKey.stringKey("state")
- private fun onGuestStop(vm: Guest) {
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to collect the guest counts on this host.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- var state: ServerState = ServerState.TERMINATED
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("guest.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The uptime of the guest.
- */
- private val _runningTime = meter.counterBuilder("guest.time.running")
- .setDescription("The uptime of the guest")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The time the guest is in an error state.
- */
- private val _errorTime = meter.counterBuilder("guest.time.error")
- .setDescription("The time the guest is in an error state")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ for ((_, guest) in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ else -> invalid++
}
}
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
- suspend fun terminate() {
- stop()
- machine.close()
- state = ServerState.DELETED
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
+
+ for (guest in guests.values) {
+ guest.collectCpuLimit(result)
}
+ }
- suspend fun fail() {
- if (state != ServerState.RUNNING) {
- return
- }
- stop()
- state = ServerState.ERROR
+ private var _lastCpuTimeCallback = clock.millis()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastCpuTimeCallback
+
+ try {
+ collectCpuTime(duration, result)
+ } finally {
+ _lastCpuTimeCallback = now
}
+ }
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- try {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- job = null
- }
- }
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = this.model.cpuCount
+ val d = coreCount / _cpuLimit
+
+ val counters = hypervisor.counters
+ val grantedWork = counters.actual
+ val overcommittedWork = counters.overcommit
+ val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+ val lostTime = (interferedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(lostTime, _lostState)
+
+ for (guest in guests.values) {
+ guest.collectCpuTime(duration, result)
}
+ }
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ private var _lastPowerCallback = clock.millis()
+ private var _totalPower = 0.0
+
+ /**
+ * Helper function to collect the total power usage of the machine.
+ */
+ private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastPowerCallback
+
+ _totalPower += duration / 1000.0 * machine.powerDraw
+ result.observe(_totalPower)
+
+ _lastPowerCallback = now
+ }
+
+ private var _lastReport = clock.millis()
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(result: ObservableLongMeasurement? = null) {
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ try {
+ collectTime(duration, result)
+ } finally {
+ _lastReport = now
}
+ }
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- onGuestStop(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == HostState.UP) {
+ _uptime += duration
+ } else if (state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var _lastReport = clock.millis()
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
- fun reportTime() {
- if (state == ServerState.DELETED)
- return
+ for (guest in guests.values) {
+ guest.collectUptime(duration, result)
+ }
+ }
- val now = clock.millis()
- val duration = now - _lastReport
+ private var _bootTime = Long.MIN_VALUE
- _totalTime.add(duration)
- when (state) {
- ServerState.RUNNING -> _runningTime.add(duration)
- ServerState.ERROR -> _errorTime.add(duration)
- else -> {}
- }
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
- _lastReport = now
+ for (guest in guests.values) {
+ guest.collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..90562e2f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimAbstractMachine
+import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Terminate the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun terminate() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Run the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "idle")
+ .build()
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = server.flavor.cpuCount
+ val d = coreCount / _cpuLimit
+
+ var grantedWork = 0.0
+ var overcommittedWork = 0.0
+
+ for (cpu in (machine as SimAbstractMachine).cpus) {
+ val counters = cpu.counters
+ grantedWork += counters.actual
+ overcommittedWork += counters.overcommit
+ }
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(0, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 31215e9a..9c879e5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,11 +23,9 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -44,15 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
private lateinit var machineModel: MachineModel
@@ -71,24 +74,29 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val virtDriver = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -130,26 +138,17 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
- metricsByName["cpu.work.overcommit"]?.let {
- overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -175,9 +174,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, requestedWork, "Requested work does not match") },
- { assertEquals(2107200, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(659, activeTime, "Active time does not match") },
+ { assertEquals(2342, idleTime, "Idle time does not match") },
+ { assertEquals(638, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -187,27 +186,32 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var totalTime = 0L
- var downTime = 0L
- var guestTotalTime = 0L
- var guestDownTime = 0L
-
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val host = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -233,35 +237,23 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
}
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["host.time.total"]?.let {
- totalTime = it.longSumData.points.first().value
- }
- metricsByName["host.time.down"]?.let {
- downTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.total"]?.let {
- guestTotalTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.error"]?.let {
- guestDownTime = it.longSumData.points.first().value
+
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
}
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -289,12 +281,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226039, requestedWork, "Total time does not match") },
- { assertEquals(1086039, grantedWork, "Down time does not match") },
- { assertEquals(1200001, totalTime, "Total time does not match") },
- { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
- { assertEquals(5000, downTime, "Down time does not match") },
- { assertEquals(5000, guestDownTime, "Guest down time does not match") },
+ { assertEquals(2661, idleTime, "Idle time does not match") },
+ { assertEquals(339, activeTime, "Active time does not match") },
+ { assertEquals(1195001, uptime, "Uptime does not match") },
+ { assertEquals(5000, downtime, "Downtime does not match") },
+ { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
)
}