diff options
8 files changed, 162 insertions, 160 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 824becf4..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,6 +22,8 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* @@ -114,69 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. + * The number of scheduling attempts. */ - private val _submittedServers = meter.counterBuilder("servers.submitted") - .setDescription("Number of start requests") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _runningServers = meter.upDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") - .setUnit("1") - .build() - - /** - * The number of servers that have finished running. - */ - private val _finishedServers = meter.counterBuilder("servers.finished") - .setDescription("Number of servers that finished running") - .setUnit("1") - .build() - - /** - * The number of hosts registered at the compute service. + * The response time of the service. */ - private val _hostCount = meter.upDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of available hosts registered at the compute service. + * The number of servers that are pending. */ - private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() - - /** - * The response time of the service. - */ - private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -189,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -316,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -346,8 +327,7 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -395,7 +375,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -407,10 +387,10 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } server.state = ServerState.TERMINATED continue @@ -423,8 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, server.attributes) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -439,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -463,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -498,16 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.ERROR) { - _runningServers.add(-1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f7f9336e..3ec424f1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -153,11 +153,12 @@ abstract class Portfolio(name: String) : Experiment(name) { val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.activeHostCount}" + "Scheduler " + + "Success=${monitorResults.attemptsSuccess} " + + "Failure=${monitorResults.attemptsFailure} " + + "Error=${monitorResults.attemptsError} " + + "Pending=${monitorResults.serversPending} " + + "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index e1428fe7..29b48878 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -36,13 +36,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServiceData) { builder["timestamp"] = data.timestamp - builder["host_total_count"] = data.hostCount - builder["host_available_count"] = data.activeHostCount - builder["instance_total_count"] = data.instanceCount - builder["instance_active_count"] = data.runningInstanceCount - builder["instance_inactive_count"] = data.finishedInstanceCount - builder["instance_waiting_count"] = data.queuedInstanceCount - builder["instance_failed_count"] = data.failedInstanceCount + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError } override fun toString(): String = "service-writer" @@ -53,13 +53,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .namespace("org.opendc.telemetry.compute") .fields() .requiredLong("timestamp") - .requiredInt("host_total_count") - .requiredInt("host_available_count") - .requiredInt("instance_total_count") - .requiredInt("instance_active_count") - .requiredInt("instance_inactive_count") - .requiredInt("instance_waiting_count") - .requiredInt("instance_failed_count") + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index f4cf3e5e..81405acf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -104,19 +104,20 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -152,11 +153,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -202,11 +204,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -246,11 +249,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 1f309f1b..f3690ee8 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,6 +22,7 @@ package org.opendc.telemetry.compute +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData @@ -37,32 +38,48 @@ public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer * Extract a [ServiceData] object from the specified list of metric data. */ public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData { - var submittedVms = 0 - var queuedVms = 0 - var unscheduledVms = 0 - var runningVms = 0 - var finishedVms = 0 - var hosts = 0 - var availableHosts = 0 + val resultKey = AttributeKey.stringKey("result") + val stateKey = AttributeKey.stringKey("state") - for (metric in metrics) { - val points = metric.longSumData.points + var hostsUp = 0 + var hostsDown = 0 - if (points.isEmpty()) { - continue - } + var serversPending = 0 + var serversActive = 0 - val value = points.first().value.toInt() + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + + for (metric in metrics) { when (metric.name) { - "servers.submitted" -> submittedVms = value - "servers.waiting" -> queuedVms = value - "servers.unscheduled" -> unscheduledVms = value - "servers.active" -> runningVms = value - "servers.finished" -> finishedVms = value - "hosts.total" -> hosts = value - "hosts.available" -> availableHosts = value + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[stateKey]) { + "up" -> hostsUp = point.value.toInt() + "down" -> hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[stateKey]) { + "pending" -> serversPending = point.value.toInt() + "active" -> serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[resultKey]) { + "success" -> attemptsSuccess = point.value.toInt() + "failure" -> attemptsFailure = point.value.toInt() + "error" -> attemptsError = point.value.toInt() + } + } + } } } - return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms) + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index f6ff5db5..da2ebdf4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -27,11 +27,11 @@ package org.opendc.telemetry.compute.table */ public data class ServiceData( public val timestamp: Long, - public val hostCount: Int, - public val activeHostCount: Int, - public val instanceCount: Int, - public val runningInstanceCount: Int, - public val finishedInstanceCount: Int, - public val queuedInstanceCount: Int, - public val failedInstanceCount: Int + public val hostsUp: Int, + public val hostsDown: Int, + public val serversPending: Int, + public val serversActive: Int, + public val attemptsSuccess: Int, + public val attemptsFailure: Int, + public val attemptsError: Int ) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b9d5a3f5..960d5ebd 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -226,11 +226,12 @@ class RunnerCli : CliktCommand(name = "runner") { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index 4b813310..5f2c474b 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -81,11 +81,11 @@ class WebComputeMonitor : ComputeMonitor { override fun record(data: ServiceData) { serviceMetrics = AggregateServiceMetrics( - max(data.instanceCount, serviceMetrics.vmTotalCount), - max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount), - max(data.runningInstanceCount, serviceMetrics.vmActiveCount), - max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount), - max(data.failedInstanceCount, serviceMetrics.vmFailedCount), + max(data.attemptsSuccess, serviceMetrics.vmTotalCount), + max(data.serversPending, serviceMetrics.vmWaitingCount), + max(data.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(data.attemptsFailure, serviceMetrics.vmFailedCount), ) } |
