summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-service
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-14 15:38:38 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:51:07 +0200
commit8d899e29dbd757f6df320212d6e0d77ce8216ab9 (patch)
tree0dc2cd6d7c3fee35c32552848b46532265594cea /opendc-compute/opendc-compute-service
parent3ca64e0110adab65526a0ccfd5b252e9f047ab10 (diff)
refactor(telemetry): Standardize compute scheduler metrics
This change updates the OpenDC compute service implementation with multiple meters that follow the OpenTelemetry conventions.
Diffstat (limited to 'opendc-compute/opendc-compute-service')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt137
1 files changed, 58 insertions, 79 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 824becf4..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,6 +22,8 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
@@ -114,69 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
+ * The number of scheduling attempts.
*/
- private val _submittedServers = meter.counterBuilder("servers.submitted")
- .setDescription("Number of start requests")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.counterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.upDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _runningServers = meter.upDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that have finished running.
- */
- private val _finishedServers = meter.counterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
- .setUnit("1")
- .build()
-
- /**
- * The number of hosts registered at the compute service.
+ * The response time of the service.
*/
- private val _hostCount = meter.upDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of available hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _availableHostCount = meter.upDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
-
- /**
- * The response time of the service.
- */
- private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
- .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
- .ofLongs()
- .setUnit("ms")
- .build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -189,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -316,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -346,8 +327,7 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -395,7 +375,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -407,10 +387,10 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
server.state = ServerState.TERMINATED
continue
@@ -423,8 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
- _schedulerDuration.record(now - request.submitTime, server.attributes)
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -439,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -463,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -498,16 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.ERROR) {
- _runningServers.add(-1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {