summaryrefslogtreecommitdiff
path: root/simulator/opendc-runner-web/src/main/kotlin/org
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-25 21:50:45 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-26 15:41:05 +0100
commit608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (patch)
treef0130622f189815e41837993b6f66ba3fc11b899 /simulator/opendc-runner-web/src/main/kotlin/org
parent0d66ef47d6e1ec0861b4939800c5070f96600ca0 (diff)
compute: Integrate OpenTelemetry Metrics in OpenDC Compute
This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics.
Diffstat (limited to 'simulator/opendc-runner-web/src/main/kotlin/org')
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt161
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt22
2 files changed, 96 insertions, 87 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 68ea3fb9..706efdc9 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,9 +34,13 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
@@ -45,17 +49,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.experiments.capelin.attachMonitor
-import org.opendc.experiments.capelin.createComputeService
-import org.opendc.experiments.capelin.createFailureDomain
+import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.processTrace
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import kotlin.coroutines.coroutineContext
import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -206,86 +207,86 @@ public class RunnerCli : CliktCommand(name = "runner") {
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
): WebExperimentMonitor.Result {
- val seed = repeat
- val traceDocument = scenario.get("trace", Document::class.java)
- val workloadName = traceDocument.getString("traceId")
- val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
-
- val seeder = Random(seed)
- val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job]))
- val clock = DelayControllerClockAdapter(testScope)
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val operational = scenario.get("operational", Document::class.java)
- val allocationPolicy =
- when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
-
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
- listOf(traceReader),
- performanceInterferenceModel,
- Workload(workloadName, workloadFraction),
- seed
- )
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
- val environment = TopologyParser(topologies, topologyId)
val monitor = WebExperimentMonitor()
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy
- )
-
- val failureDomain = if (operational.getBoolean("failuresEnabled")) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- testScope,
- clock,
- seeder.nextInt(),
- operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
+ try {
+ runBlockingTest {
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val clock = DelayControllerClockAdapter(this)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
+
+ withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" }
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
- failureDomain?.cancel()
- scheduler.close()
- }
+ failureDomain?.cancel()
+ }
- try {
- testScope.advanceUntilIdle()
- testScope.uncaughtExceptions.forEach { it.printStackTrace() }
- } finally {
- monitor.close()
- testScope.cancel()
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
}
return monitor.getResult()
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index a8ac6c10..fcd43ea7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerMetrics = AggregateProvisionerMetrics(
- max(event.totalVmCount, provisionerMetrics.vmTotalCount),
- max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(event.activeVmCount, provisionerMetrics.vmActiveCount),
- max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(event.failedVmCount, provisionerMetrics.vmFailedCount),
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
)
}