diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-25 21:50:45 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-26 15:41:05 +0100 |
| commit | 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (patch) | |
| tree | f0130622f189815e41837993b6f66ba3fc11b899 /simulator/opendc-runner-web/src/main/kotlin/org | |
| parent | 0d66ef47d6e1ec0861b4939800c5070f96600ca0 (diff) | |
compute: Integrate OpenTelemetry Metrics in OpenDC Compute
This change integrates the OpenTelemetry Metrics API in the OpenDC
Compute Service implementation. This replaces the old infrastructure for
gathering metrics.
Diffstat (limited to 'simulator/opendc-runner-web/src/main/kotlin/org')
| -rw-r--r-- | simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt | 161 | ||||
| -rw-r--r-- | simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt | 22 |
2 files changed, 96 insertions, 87 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 68ea3fb9..706efdc9 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,9 +34,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId @@ -45,17 +49,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.experiments.capelin.attachMonitor -import org.opendc.experiments.capelin.createComputeService -import org.opendc.experiments.capelin.createFailureDomain +import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import kotlin.coroutines.coroutineContext import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -206,86 +207,86 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ): WebExperimentMonitor.Result { - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job])) - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel<Unit>(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) - val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - scheduler, - chan - ) - } else { - null - } + try { + runBlockingTest { + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val clock = DelayControllerClockAdapter(this) + + val chan = Channel<Unit>(Channel.CONFLATED) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val metricProducer = meterProvider as MetricProducer + val meter: Meter = meterProvider.get("opendc-compute") + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) + val environment = TopologyParser(topologies, topologyId) + val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 + + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + val failureDomain = if (failureFrequency > 0) { + logger.debug { "ENABLING failures" } + createFailureDomain( + this, + clock, + seeder.nextInt(), + failureFrequency, + scheduler, + chan + ) + } else { + null + } - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" } + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } - failureDomain?.cancel() - scheduler.close() - } + failureDomain?.cancel() + } - try { - testScope.advanceUntilIdle() - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - } finally { - monitor.close() - testScope.cancel() + val monitorResults = collectMetrics(metricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } } return monitor.getResult() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index a8ac6c10..fcd43ea7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerMetrics = AggregateProvisionerMetrics( - max(event.totalVmCount, provisionerMetrics.vmTotalCount), - max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), - max(event.activeVmCount, provisionerMetrics.vmActiveCount), - max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(event.failedVmCount, provisionerMetrics.vmFailedCount), + max(totalVmCount, provisionerMetrics.vmTotalCount), + max(waitingVmCount, provisionerMetrics.vmWaitingCount), + max(activeVmCount, provisionerMetrics.vmActiveCount), + max(inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(failedVmCount, provisionerMetrics.vmFailedCount), ) } |
