From 3ca64e0110adab65526a0ccfd5b252e9f047ab10 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 14:41:05 +0200 Subject: refactor(telemetry): Create separate MeterProvider per service/host This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario. --- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 72 ++++++++++------------ .../org/opendc/web/runner/WebComputeMonitor.kt | 40 +++++------- 2 files changed, 48 insertions(+), 64 deletions(-) (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b565e90d..b9d5a3f5 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.env.MachineDef @@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario import org.opendc.web.client.model.Topology import java.io.File import java.net.URI +import java.time.Duration import java.util.* -import kotlin.random.Random -import kotlin.random.asJavaRandom import org.opendc.web.client.model.Portfolio as ClientPortfolio private val logger = KotlinLogging.logger {} @@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val seed = repeat val workloadName = scenario.trace.traceId val workloadFraction = scenario.trace.loadSamplingFraction - val seeder = Random(seed) + val seeder = Random(repeat.toLong()) val meterProvider: MeterProvider = SdkMeterProvider .builder() .setClock(clock.toOtelClock()) .build() - val metricProducer = meterProvider as MetricProducer val operational = scenario.operationalPhenomena - val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder) + val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) val trace = ParquetTraceReader( listOf(traceReader), Workload(workloadName, workloadFraction), - seed + repeat ) - val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0 - - withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> - val faultInjector = if (failureFrequency > 0) { - logger.debug { "ENABLING failures" } - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - failureFrequency, - ) - } else { + val failureModel = + if (operational.failuresEnabled) + grid5000(Duration.ofDays(7), repeat) + else null - } - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + interferenceModel.takeIf { operational.performanceInterferenceEnabled } + ) - processTrace( - clock, - trace, - scheduler, - monitor - ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - faultInjector?.close() - } + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index c8e58dde..4b813310 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -22,27 +22,19 @@ package org.opendc.web.runner -import mu.KotlinLogging -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServiceData import kotlin.math.max +import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -public class WebComputeMonitor : ComputeMonitor { - private val logger = KotlinLogging.logger {} - - override fun onStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - +class WebComputeMonitor : ComputeMonitor { override fun record(data: HostData) { - val duration = 5 * 60 * 1000L - val slices = duration / SLICE_LENGTH + val duration = data.uptime + val slices = data.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( hostAggregateMetrics.totalWork + data.totalWork, @@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices ) - hostMetrics.compute(data.host) { _, prev -> + hostMetrics.compute(data.host.id) { _, prev -> HostMetrics( - (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), data.instanceCount + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() + private val hostMetrics: MutableMap = mutableMapOf() private val SLICE_LENGTH: Long = 5 * 60 * 1000 data class AggregateHostMetrics( @@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor { val totalOvercommittedWork: Double = 0.0, val totalInterferedWork: Double = 0.0, val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, ) data class HostMetrics( @@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor { ) } - public data class AggregateServiceMetrics( + data class AggregateServiceMetrics( val vmTotalCount: Int = 0, val vmWaitingCount: Int = 0, val vmActiveCount: Int = 0, @@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor { val vmFailedCount: Int = 0 ) - public fun getResult(): Result { + fun getResult(): Result { return Result( hostAggregateMetrics.totalWork, hostAggregateMetrics.totalGrantedWork, @@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor { hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), serviceMetrics.vmTotalCount, serviceMetrics.vmWaitingCount, serviceMetrics.vmInactiveCount, -- cgit v1.2.3