summaryrefslogtreecommitdiff
path: root/opendc-web
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-14 14:41:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:48:53 +0200
commit3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch)
tree9cad172501154d01b00a1e5c45d94d7e2f1e5698 /opendc-web
parent5f0b6b372487d79594cf59010822e160f351e0be (diff)
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario.
Diffstat (limited to 'opendc-web')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt72
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt40
2 files changed, 48 insertions, 64 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index b565e90d..b9d5a3f5 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.env.MachineDef
@@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
+import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario
import org.opendc.web.client.model.Topology
import java.io.File
import java.net.URI
+import java.time.Duration
import java.util.*
-import kotlin.random.Random
-import kotlin.random.asJavaRandom
import org.opendc.web.client.model.Portfolio as ClientPortfolio
private val logger = KotlinLogging.logger {}
@@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
val results = (0 until targets.repeatsPerScenario).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
- val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
}
}
@@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
runBlockingSimulation {
- val seed = repeat
val workloadName = scenario.trace.traceId
val workloadFraction = scenario.trace.loadSamplingFraction
- val seeder = Random(seed)
+ val seeder = Random(repeat.toLong())
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
.build()
- val metricProducer = meterProvider as MetricProducer
val operational = scenario.operationalPhenomena
- val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder)
+ val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
val trace = ParquetTraceReader(
listOf(traceReader),
Workload(workloadName, workloadFraction),
- seed
+ repeat
)
- val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
-
- withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
- val faultInjector = if (failureFrequency > 0) {
- logger.debug { "ENABLING failures" }
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seeder.nextInt(),
- failureFrequency,
- )
- } else {
+ val failureModel =
+ if (operational.failuresEnabled)
+ grid5000(Duration.ofDays(7), repeat)
+ else
null
- }
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector?.start()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environment.read(),
+ failureModel,
+ interferenceModel.takeIf { operational.performanceInterferenceEnabled }
+ )
- processTrace(
- clock,
- trace,
- scheduler,
- monitor
- )
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
- faultInjector?.close()
- }
+ try {
+ simulator.run(trace)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
logger.debug {
"Finish " +
- "SUBMIT=${monitorResults.instanceCount} " +
- "FAIL=${monitorResults.failedInstanceCount} " +
- "QUEUE=${monitorResults.queuedInstanceCount} " +
- "RUNNING=${monitorResults.runningInstanceCount}"
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
}
}
} catch (cause: Throwable) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
index c8e58dde..4b813310 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -22,27 +22,19 @@
package org.opendc.web.runner
-import mu.KotlinLogging
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServiceData
import kotlin.math.max
+import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-public class WebComputeMonitor : ComputeMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun onStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
+class WebComputeMonitor : ComputeMonitor {
override fun record(data: HostData) {
- val duration = 5 * 60 * 1000L
- val slices = duration / SLICE_LENGTH
+ val duration = data.uptime
+ val slices = data.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
hostAggregateMetrics.totalWork + data.totalWork,
@@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor {
hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ hostAggregateMetrics.totalFailureSlices + slices,
+ hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices
)
- hostMetrics.compute(data.host) { _, prev ->
+ hostMetrics.compute(data.host.id) { _, prev ->
HostMetrics(
- (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ data.cpuDemand + (prev?.cpuDemand ?: 0.0),
data.instanceCount + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
@@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor {
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
private val SLICE_LENGTH: Long = 5 * 60 * 1000
data class AggregateHostMetrics(
@@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor {
val totalOvercommittedWork: Double = 0.0,
val totalInterferedWork: Double = 0.0,
val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
+ val totalFailureSlices: Double = 0.0,
+ val totalFailureVmSlices: Double = 0.0,
)
data class HostMetrics(
@@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor {
)
}
- public data class AggregateServiceMetrics(
+ data class AggregateServiceMetrics(
val vmTotalCount: Int = 0,
val vmWaitingCount: Int = 0,
val vmActiveCount: Int = 0,
@@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor {
val vmFailedCount: Int = 0
)
- public fun getResult(): Result {
+ fun getResult(): Result {
return Result(
hostAggregateMetrics.totalWork,
hostAggregateMetrics.totalGrantedWork,
@@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor {
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
+ hostAggregateMetrics.totalFailureSlices.roundToLong(),
+ hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
serviceMetrics.vmTotalCount,
serviceMetrics.vmWaitingCount,
serviceMetrics.vmInactiveCount,