diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-27 12:48:51 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-27 12:48:51 +0100 |
| commit | bef2b2fc9ab97941613ec4537ebca1eb3fccdee6 (patch) | |
| tree | 4a4c1e0fb2de6aa0a0b9810f9c255e039a7ce47b /simulator/opendc-experiments/opendc-experiments-capelin | |
| parent | 57ebc8a1c6779d7e7276754838e83f1c026cceb9 (diff) | |
| parent | 5b0eaf76ec00192c755b268b7655f6463c5bc62f (diff) | |
Integrate OpenTelemetry into OpenDC
This pull request resolves issue #91 by adopting the industry-standard OpenTelemetry standard
for collecting traces and metrics in OpenDC simulations:
* Metrics will now be exposed through OpenTelemetry's metrics API
* `opendc-telemetry` provides complementary code to support
gathering telemetry in OpenDC.
**Breaking API Changes**
* `opendc-tracer` has been removed.
* `EventFlow` and all usages of it have been removed.
* `opendc-experiments-sc18` has been removed for now, but a
suitable replacement will follow soon.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin')
7 files changed, 373 insertions, 225 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 2d0da1bf..b2d7cc30 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -47,4 +47,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 44436019..40f50235 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,26 +22,19 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -51,9 +44,11 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Clock +import kotlin.coroutines.coroutineContext +import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -136,13 +131,13 @@ public fun createTraceReader( /** * Construct the environment for a simulated compute service.. */ -public fun createComputeService( - coroutineScope: CoroutineScope, +public suspend fun withComputeService( clock: Clock, + meterProvider: MeterProvider, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, - eventTracer: EventTracer -): ComputeServiceImpl { + block: suspend CoroutineScope.(ComputeService) -> Unit +): Unit = coroutineScope { val hosts = environmentReader .use { it.read() } .map { def -> @@ -151,33 +146,43 @@ public fun createComputeService( def.name, def.model, def.meta, - coroutineScope.coroutineContext, + coroutineContext, clock, + meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel ) } + val schedulerMeter = meterProvider.get("opendc-compute") val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) } - return scheduler + try { + block(this, scheduler) + } finally { + scheduler.close() + hosts.forEach(SimHost::close) + } } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun attachMonitor( - coroutineScope: CoroutineScope, +public suspend fun withMonitor( + monitor: ExperimentMonitor, clock: Clock, + metricProducer: MetricProducer, scheduler: ComputeService, - monitor: ExperimentMonitor -) { + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + val monitorJobs = mutableSetOf<Job>() + // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -186,45 +191,55 @@ public fun attachMonitor( monitor.reportHostStateChange(clock.millis(), host, newState) } }) + } - host.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.driver - ) - } - } - .launchIn(coroutineScope) + val reader = CoroutineMetricReader( + this, + listOf(metricProducer), + ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), + exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ + ) - (host as SimHost).machine.powerDraw - .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(coroutineScope) + try { + block(this) + } finally { + monitorJobs.forEach(Job::cancel) + reader.close() + monitor.close() } +} - scheduler.events - .onEach { event -> - when (event) { - is ComputeServiceEvent.MetricsAvailable -> - monitor.reportProvisionerMetrics(clock.millis(), event) - } - } - .launchIn(coroutineScope) +public class ComputeMetrics { + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var unscheduledVms: Int = 0 + public var finishedVms: Int = 0 +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = ComputeMetrics() + try { + // Hack to extract metrics from OpenTelemetry SDK + res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + } catch (cause: Throwable) { + logger.warn(cause) { "Failed to collect metrics" } + } + return res } /** * Process the trace. */ public suspend fun processTrace( - coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -233,44 +248,46 @@ public suspend fun processTrace( ) { val client = scheduler.newClient() val image = client.newImage("vm-image") + var offset = Long.MIN_VALUE try { - var submitted = 0 + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() - while (reader.hasNext()) { - val entry = reader.next() + if (offset < 0) { + offset = entry.start - clock.millis() + } - submitted++ - delay(max(0, entry.start - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = client.newServer( - entry.name, - image, - client.newFlavor( + delay(max(0, (entry.start - offset) - clock.millis())) + launch { + chan.send(Unit) + val server = client.newServer( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta - ) + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + ) - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) + suspendCancellableCoroutine { cont -> + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) + + if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + cont.resume(Unit) + } + } + }) } - }) + } } } - scheduler.events - .takeWhile { - when (it) { - is ComputeServiceEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) + yield() } finally { reader.close() client.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f9c96bb6..5fa77161 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,19 +22,15 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging -import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy -import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.compute.simulator.allocation.* +import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -47,7 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -117,16 +113,19 @@ public abstract class Portfolio(name: String) : Experiment(name) { * Perform a single trial for this portfolio. */ @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createAllocationPolicy(seeder) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -152,15 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy, - tracer - ) - + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( @@ -175,31 +166,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() } - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } + val monitorResults = collectMetrics(meterProvider as MetricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } /** diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt new file mode 100644 index 00000000..799de60f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import org.opendc.compute.service.driver.Host +import java.time.Clock + +/** + * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. + */ +public class ExperimentMetricExporter( + private val monitor: ExperimentMonitor, + private val clock: Clock, + private val hosts: Map<String, Host> +) : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + reportHostMetrics(metricsByName) + reportProvisionerMetrics(metricsByName) + return CompletableResultCode.ofSuccess() + } + + private fun reportHostMetrics(metrics: Map<String, MetricData>) { + val hostMetrics = mutableMapOf<String, HostMetrics>() + hosts.mapValuesTo(hostMetrics) { HostMetrics() } + + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> + m.cpuDemand = v + } + + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> + m.cpuUsage = v + } + + mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> + m.powerDraw = v + } + + mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> + m.requestedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> + m.grantedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> + m.overcommissionedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + m.interferedBurst = v.toLong() + } + + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> + m.numberOfDeployedImages = v.toInt() + } + + for ((id, hostMetric) in hostMetrics) { + val host = hosts.getValue(id) + monitor.reportHostSlice( + clock.millis(), + hostMetric.requestedBurst, + hostMetric.grantedBurst, + hostMetric.overcommissionedBurst, + hostMetric.interferedBurst, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.numberOfDeployedImages, + host + ) + + monitor.reportPowerConsumption(host, hostMetric.powerDraw) + } + } + + private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSummaryData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.sum) + } + } + } + + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { + val points = data?.longSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) { + val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + } + + private class HostMetrics { + var requestedBurst: Long = 0 + var grantedBurst: Long = 0 + var overcommissionedBurst: Long = 0 + var interferedBurst: Long = 0 + var cpuUsage: Double = 0.0 + var cpuDemand: Double = 0.0 + var numberOfDeployedImages: Int = 0 + var powerDraw: Double = 0.0 + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 14cc06dc..5e75c890 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,15 +24,13 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState -import java.io.Closeable /** * A monitor watches the events of an experiment. */ -public interface ExperimentMonitor : Closeable { +public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked when the state of a VM changes. */ @@ -68,5 +66,14 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index c9d57a98..0e675d87 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent @@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerWriter.write( ProvisionerEvent( time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount + totalHostCount, + availableHostCount, + totalVmCount, + activeVmCount, + inactiveVmCount, + waitingVmCount, + failedVmCount ) ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a812490a..02cfdc06 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,19 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -45,9 +44,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. @@ -55,16 +53,6 @@ import java.time.Clock @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - - /** * The monitor used to keep track of the metrics. */ private lateinit var monitor: TestExperimentReporter @@ -74,38 +62,26 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeServiceImpl - val tracer = EventTracer(clock) - - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) + lateinit var monitorResults: ComputeMetrics + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -120,29 +96,28 @@ class CapelinIntegrationTest { null } - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -151,41 +126,33 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - val tracer = EventTracer(clock) - - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - scheduler.close() - monitor.close() + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -197,11 +164,6 @@ class CapelinIntegrationTest { } /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - - /** * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { |
