diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-04 16:24:53 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 18:37:36 +0200 |
| commit | c7eec7904e08029b3ab31d3e7b21afa1ea9ab7e6 (patch) | |
| tree | 459724b394f5aca35733582a024fd5d99d06a7a4 /opendc-compute/opendc-compute-workload/src | |
| parent | a9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff) | |
refactor(compute/service): Remove OpenTelemetry from "compute" modules
This change removes the OpenTelemetry integration from the OpenDC
Compute modules. Previously, we chose to integrate OpenTelemetry to
provide a unified way to report metrics to the users.
Although this worked as expected, the overhead of the OpenTelemetry when
collecting metrics during simulation was considerable and lacked more
optimization opportunities (other than providing a separate API
implementation). Furthermore, since we were tied to OpenTelemetry's SDK
implementation, we experienced issues with throttling and registering
multiple instruments.
We will instead use another approach, where we expose the core metrics
in OpenDC via specialized interfaces (see the commits before) such that
access is fast and can be done without having to interface with
OpenTelemetry. In addition, we will provide an adapter to that is able
to forward these metrics to OpenTelemetry implementations, so we can
still integrate with the wider ecosystem.
Diffstat (limited to 'opendc-compute/opendc-compute-workload/src')
17 files changed, 824 insertions, 177 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 21cfdad2..fddb4890 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -30,7 +30,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.telemetry.TelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -46,7 +45,6 @@ import kotlin.math.max * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. - * @param telemetry Helper class for managing telemetry. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. @@ -55,7 +53,6 @@ import kotlin.math.max public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, - private val telemetry: TelemetryManager, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, @@ -167,7 +164,6 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val meterProvider = telemetry.createMeterProvider(spec) val host = SimHost( spec.uid, spec.name, @@ -175,7 +171,6 @@ public class ComputeServiceHelper( spec.meta, context, _engine, - meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, interferenceDomain = interferenceModel?.newDomain(), @@ -202,7 +197,6 @@ public class ComputeServiceHelper( * Construct a [ComputeService] instance. */ private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - val meterProvider = telemetry.createMeterProvider(scheduler) - return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeService(context, clock, scheduler, schedulingQuantum) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index 6c515118..af4dad44 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,10 +22,10 @@ package org.opendc.compute.workload.export.parquet -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 0d5b6b34..e6e7e42d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 5d11629b..082c7c88 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 5ad3b95e..2a0fdca1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,12 +22,11 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.context.ContextKey.named import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..45bd9ab1 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.workload.telemetry.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List<Server>, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt index b67050ce..36a2079a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt @@ -22,21 +22,26 @@ package org.opendc.compute.workload.telemetry -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader /** - * Helper class to manage the telemetry for a [ComputeServiceHelper] instance. + * A monitor that tracks the metrics and events of the OpenDC Compute service. */ -public interface TelemetryManager { +public interface ComputeMonitor { /** - * Construct a [MeterProvider] for the specified [ComputeScheduler]. + * Record an entry with the specified [reader]. */ - public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider + public fun record(reader: ServerTableReader) {} /** - * Construct a [MeterProvider] for the specified [HostSpec]. + * Record an entry with the specified [reader]. */ - public fun createMeterProvider(host: HostSpec): MeterProvider + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt deleted file mode 100644 index 478c0609..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock - -/** - * A [TelemetryManager] using the OpenTelemetry Java SDK. - */ -public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable { - /** - * The [SdkMeterProvider]s that belong to the workload runner. - */ - private val _meterProviders = mutableListOf<SdkMeterProvider>() - - /** - * The internal [MetricProducer] registered with the runner. - */ - private val _metricProducers = mutableListOf<MetricProducer>() - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf<MetricReader>() - - /** - * A [MetricProducer] that combines all the other metric producers. - */ - public val metricProducer: MetricProducer = object : MetricProducer { - private val producers = _metricProducers - - override fun collectAllMetrics(): Collection<MetricData> = producers.flatMap(MetricProducer::collectAllMetrics) - - override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer" - } - - /** - * Register a [MetricReader] for this manager. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - return createMeterProvider(resource) - } - - override fun createMeterProvider(host: HostSpec): MeterProvider { - val resource = Resource.builder() - .put(HOST_ID, host.uid.toString()) - .put(HOST_NAME, host.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, host.model.cpus.size) - .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size }) - .build() - - return createMeterProvider(resource) - } - - /** - * Construct a [SdkMeterProvider] for the specified [resource]. - */ - private fun createMeterProvider(resource: Resource): SdkMeterProvider { - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - _metricProducers.add(producer) - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - } - } - .build() - _meterProviders.add(meterProvider) - return meterProvider - } - - override fun close() { - for (meterProvider in _meterProviders) { - meterProvider.close() - } - - _meterProviders.clear() - - for (metricReader in _metricReaders) { - metricReader.shutdown() - } - - _metricReaders.clear() - _metricProducers.clear() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt index 4e7d0b75..5d383e40 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,9 @@ * SOFTWARE. */ -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec +package org.opendc.compute.workload.telemetry.table /** - * A [TelemetryManager] that does nothing. + * Information about a host exposed to the telemetry service. */ -public class NoopTelemetryManager : TelemetryManager { - override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop() - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop() -} +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..8f6f0d01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt new file mode 100644 index 00000000..111135b7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt new file mode 100644 index 00000000..bccccd01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..a1df6ea7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..4211ab15 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt index dae03513..4344bb08 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -25,8 +25,8 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.HostTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt index 280f5ef8..8465871d 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerInfo -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.ServerInfo +import org.opendc.compute.workload.telemetry.table.ServerTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt index 7ffa7186..d91982bc 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.nio.file.Files import java.time.Instant |
