From 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 21:50:45 +0100 Subject: compute: Integrate OpenTelemetry Metrics in OpenDC Compute This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics. --- simulator/buildSrc/src/main/kotlin/Versions.kt | 5 + simulator/gradle.properties | 6 + .../opendc-compute-service/build.gradle.kts | 1 + .../org/opendc/compute/service/ComputeService.kt | 9 +- .../opendc/compute/service/ComputeServiceEvent.kt | 47 ------ .../compute/service/internal/ComputeServiceImpl.kt | 174 ++++++++++----------- .../opendc/compute/service/ComputeServiceTest.kt | 4 +- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 2 +- .../opendc-experiments-capelin/build.gradle.kts | 2 + .../experiments/capelin/ExperimentHelpers.kt | 131 ++++++++++++---- .../org/opendc/experiments/capelin/Portfolio.kt | 59 +++---- .../capelin/monitor/ExperimentMonitor.kt | 12 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 26 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 129 ++++++--------- .../sc18/UnderspecificationExperiment.kt | 2 + simulator/opendc-runner-web/build.gradle.kts | 2 + .../src/main/kotlin/org/opendc/runner/web/Main.kt | 161 +++++++++---------- .../org/opendc/runner/web/WebExperimentMonitor.kt | 22 ++- .../opendc/simulator/compute/SimAbstractMachine.kt | 7 +- .../simulator/compute/SimBareMetalMachine.kt | 1 + simulator/opendc-telemetry/build.gradle.kts | 21 +++ .../opendc-telemetry-api/build.gradle.kts | 34 ++++ .../opendc-telemetry-sdk/build.gradle.kts | 38 +++++ .../org/opendc/telemetry/sdk/OtelClockAdapter.kt | 39 +++++ .../sdk/metrics/export/CoroutineMetricReader.kt | 99 ++++++++++++ .../StageWorkflowSchedulerIntegrationTest.kt | 4 +- simulator/settings.gradle.kts | 2 + 27 files changed, 646 insertions(+), 393 deletions(-) delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt create mode 100644 simulator/opendc-telemetry/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt (limited to 'simulator') diff --git a/simulator/buildSrc/src/main/kotlin/Versions.kt b/simulator/buildSrc/src/main/kotlin/Versions.kt index d1df6284..6aa9260b 100644 --- a/simulator/buildSrc/src/main/kotlin/Versions.kt +++ b/simulator/buildSrc/src/main/kotlin/Versions.kt @@ -49,6 +49,11 @@ public class Versions(private val project: Project) { val kotlinxCoroutines by version(name = "kotlinx-coroutines") + val otelApi by version(name = "opentelemetry-api") + val otelApiMetrics by version(name = "opentelemetry-api-metrics") + val otelSdk by version(name = "opentelemetry-sdk") + val otelSdkMetrics by version(name = "opentelemetry-sdk-metrics") + /** * Obtain the version for the specified [dependency][name]. diff --git a/simulator/gradle.properties b/simulator/gradle.properties index 8d41408c..99b08bb2 100644 --- a/simulator/gradle.properties +++ b/simulator/gradle.properties @@ -28,6 +28,12 @@ kotlin-logging.version = 2.0.6 slf4j.version = 1.7.30 log4j.version = 2.14.1 +# Dependencies (Telemetry) +opentelemetry-api.version = 1.0.1 +opentelemetry-api-metrics.version = 1.0.1-alpha +opentelemetry-sdk.version = 1.0.1 +opentelemetry-sdk-metrics.version = 1.0.1-alpha + # Dependencies (CLI) clikt.version = 3.1.0 progressbar.version = 0.9.0 diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts index 41b506b2..909e2dcd 100644 --- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -32,6 +32,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 28cef83a..4bc0ba78 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,6 +22,7 @@ package org.opendc.compute.service +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host @@ -34,11 +35,6 @@ import kotlin.coroutines.CoroutineContext * The [ComputeService] hosts the API implementation of the OpenDC Compute service. */ public interface ComputeService : AutoCloseable { - /** - * The events emitted by the service. - */ - public val events: Flow - /** * The hosts that are used by the compute service. */ @@ -80,10 +76,11 @@ public interface ComputeService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, + meter: Meter, allocationPolicy: AllocationPolicy, schedulingQuantum: Long = 300000, ): ComputeService { - return ComputeServiceImpl(context, clock, allocationPolicy, schedulingQuantum) + return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum) } } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt deleted file mode 100644 index 193008a7..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service - -/** - * An event that is emitted by the [ComputeService]. - */ -public sealed class ComputeServiceEvent { - /** - * The service that has emitted the event. - */ - public abstract val provisioner: ComputeService - - /** - * An event emitted for writing metrics. - */ - public data class MetricsAvailable( - override val provisioner: ComputeService, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int - ) : ComputeServiceEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f9bd7fbc..26a34ad9 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,18 +22,16 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.utils.TimerScheduler -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -48,6 +46,7 @@ import kotlin.math.max internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, + private val meter: Meter, private val allocationPolicy: AllocationPolicy, private val schedulingQuantum: Long ) : ComputeService, HostListener { @@ -101,24 +100,70 @@ internal class ComputeServiceImpl( */ private val servers = mutableMapOf() - private var submittedVms: Int = 0 - private var queuedVms: Int = 0 - private var runningVms: Int = 0 - private var finishedVms: Int = 0 - private var unscheduledVms: Int = 0 - private var maxCores = 0 private var maxMemory = 0L + /** + * The number of servers that have been submitted to the service for provisioning. + */ + private val _submittedServers = meter.longCounterBuilder("servers.submitted") + .setDescription("Number of start requests") + .setUnit("1") + .build() + + /** + * The number of servers that failed to be scheduled. + */ + private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") + .setDescription("Number of unscheduled servers") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") + .setDescription("Number of servers waiting to be provisioned") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _runningServers = meter.longUpDownCounterBuilder("servers.active") + .setDescription("Number of servers currently running") + .setUnit("1") + .build() + + /** + * The number of servers that have finished running. + */ + private val _finishedServers = meter.longCounterBuilder("servers.finished") + .setDescription("Number of servers that finished running") + .setUnit("1") + .build() + + /** + * The number of hosts registered at the compute service. + */ + private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") + .setDescription("Number of hosts") + .setUnit("1") + .build() + + /** + * The number of available hosts registered at the compute service. + */ + private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") + .setDescription("Number of available hosts") + .setUnit("1") + .build() + /** * The allocation logic to use. */ private val allocationLogic = allocationPolicy() - override val events: Flow - get() = _events - private val _events = EventFlow() - /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ @@ -204,18 +249,6 @@ internal class ComputeServiceImpl( start: Boolean ): Server { check(!isClosed) { "Client is closed" } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - ) - ) val uid = UUID(clock.millis(), random.nextLong()) val server = InternalServer( @@ -269,14 +302,23 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { + _availableHostCount.add(1) availableHosts += hv } + _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { - host.removeListener(this) + val view = hostToView.remove(host) + if (view != null) { + if (availableHosts.remove(view)) { + _availableHostCount.add(-1) + } + host.removeListener(this) + _hostCount.add(-1) + } } override fun close() { @@ -288,6 +330,8 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server) queue.add(request) + _submittedServers.add(1) + _waitingServers.add(1) requestSchedulingCycle() return request } @@ -332,6 +376,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _waitingServers.add(-1) continue } @@ -341,21 +386,10 @@ internal class ComputeServiceImpl( logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - --queuedVms, - ++unscheduledVms - ) - ) - // Remove the incoming image queue.poll() + _waitingServers.add(-1) + _unscheduledServers.add(1) logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") @@ -370,6 +404,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() + _waitingServers.add(-1) logger.info { "Assigned server $server to host $host." } @@ -384,19 +419,6 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) } catch (e: Throwable) { logger.error("Failed to deploy VM", e) @@ -427,21 +449,9 @@ internal class ComputeServiceImpl( if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv + _availableHostCount.add(1) } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - // Re-schedule on the new machine requestSchedulingCycle() } @@ -450,19 +460,7 @@ internal class ComputeServiceImpl( val hv = hostToView[host] ?: return availableHosts -= hv - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) + _availableHostCount.add(-1) requestSchedulingCycle() } @@ -480,23 +478,15 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + if (newState == ServerState.RUNNING) { + _runningServers.add(1) + } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - activeServers -= server + _runningServers.add(-1) + _finishedServers.add(1) + val hv = hostToView[host] if (hv != null) { hv.provisionedCores -= server.flavor.cpuCount diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index e1482152..45a306aa 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -23,6 +23,7 @@ package org.opendc.compute.service import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.test.TestCoroutineScope @@ -55,7 +56,8 @@ internal class ComputeServiceTest { scope = TestCoroutineScope() val clock = DelayControllerClockAdapter(scope) val policy = AvailableMemoryAllocationPolicy() - service = ComputeService(scope.coroutineContext, clock, policy) + val meter = MeterProvider.noop().get("opendc-compute") + service = ComputeService(scope.coroutineContext, clock, meter, policy) } @AfterEach diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 2e4191cc..89784803 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -59,7 +59,7 @@ public class SimHost( /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context) + override val scope: CoroutineScope = CoroutineScope(context + Job()) /** * The logger instance of this server. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 2d0da1bf..b2d7cc30 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -47,4 +47,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 6f99a44e..4f48bba7 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,6 +22,11 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.launchIn @@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener @@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Clock +import kotlin.coroutines.coroutineContext import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max @@ -130,12 +136,13 @@ public fun createTraceReader( /** * Construct the environment for a simulated compute service.. */ -public fun createComputeService( - coroutineScope: CoroutineScope, +public suspend fun withComputeService( clock: Clock, + meter: Meter, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): ComputeService { + allocationPolicy: AllocationPolicy, + block: suspend CoroutineScope.(ComputeService) -> Unit +): Unit = coroutineScope { val hosts = environmentReader .use { it.read() } .map { def -> @@ -144,7 +151,7 @@ public fun createComputeService( def.name, def.model, def.meta, - coroutineScope.coroutineContext, + coroutineContext, clock, SimFairShareHypervisorProvider(), def.powerModel @@ -152,26 +159,33 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy) + ComputeService(coroutineContext, clock, meter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) } - return scheduler + try { + block(this, scheduler) + } finally { + scheduler.close() + hosts.forEach(SimHost::close) + } } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun attachMonitor( - coroutineScope: CoroutineScope, +public suspend fun withMonitor( + monitor: ExperimentMonitor, clock: Clock, + metricProducer: MetricProducer, scheduler: ComputeService, - monitor: ExperimentMonitor -): MonitorResults { - val results = MonitorResults() + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + val monitorJobs = mutableSetOf() + // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -181,7 +195,7 @@ public fun attachMonitor( } }) - host.events + monitorJobs += host.events .onEach { event -> when (event) { is HostEvent.SliceFinished -> monitor.reportHostSlice( @@ -197,37 +211,81 @@ public fun attachMonitor( ) } } - .launchIn(coroutineScope) + .launchIn(this) - (host as SimHost).machine.powerDraw + monitorJobs += (host as SimHost).machine.powerDraw .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(coroutineScope) + .launchIn(this) } - scheduler.events - .onEach { event -> - when (event) { - is ComputeServiceEvent.MetricsAvailable -> { - results.submittedVms = event.totalVmCount - results.queuedVms = event.waitingVmCount - results.runningVms = event.activeVmCount - results.finishedVms = event.inactiveVmCount - results.unscheduledVms = event.failedVmCount - monitor.reportProvisionerMetrics(clock.millis(), event) - } + val reader = CoroutineMetricReader( + this, listOf(metricProducer), + object : MetricExporter { + override fun export(metrics: Collection): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + + val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + return CompletableResultCode.ofSuccess() } - } - .launchIn(coroutineScope) - return results + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = 5 * 60 * 1000 + ) + + try { + block(this) + } finally { + monitorJobs.forEach(Job::cancel) + reader.close() + monitor.close() + } } -public class MonitorResults { +public class ComputeMetrics { public var submittedVms: Int = 0 public var queuedVms: Int = 0 public var runningVms: Int = 0 - public var finishedVms: Int = 0 public var unscheduledVms: Int = 0 + public var finishedVms: Int = 0 +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = ComputeMetrics() + try { + // Hack to extract metrics from OpenTelemetry SDK + res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + } catch (cause: Throwable) { + logger.warn(cause) { "Failed to collect metrics" } + } + return res } /** @@ -242,12 +300,17 @@ public suspend fun processTrace( ) { val client = scheduler.newClient() val image = client.newImage("vm-image") + var offset = Long.MIN_VALUE try { coroutineScope { while (reader.hasNext()) { val entry = reader.next() - delay(max(0, entry.start - clock.millis())) + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) val server = client.newServer( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 46e0bcb9..2921daba 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,11 +22,13 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload @@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { * Perform a single trial for this portfolio. */ @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel(Channel.CONFLATED) val allocationPolicy = createAllocationPolicy(seeder) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter = meterProvider.get("opendc-compute") + val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( @@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${monitorResults.submittedVms}") - logger.debug("FAIL=${monitorResults.unscheduledVms}") - logger.debug("QUEUED=${monitorResults.queuedVms}") - logger.debug("RUNNING=${monitorResults.runningVms}") - logger.debug("FINISHED=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() } - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } + val monitorResults = collectMetrics(meterProvider as MetricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } /** diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 14cc06dc..a57c8d78 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import java.io.Closeable @@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index c9d57a98..0e675d87 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent @@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerWriter.write( ProvisionerEvent( time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount + totalHostCount, + availableHostCount, + totalVmCount, + activeVmCount, + inactiveVmCount, + waitingVmCount, + failedVmCount ) ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a836b334..fd906f4d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,18 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload @@ -45,24 +45,14 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. */ @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { - /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - /** * The monitor used to keep track of the metrics. */ @@ -73,37 +63,28 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeService - lateinit var monitorResults: MonitorResults + lateinit var monitorResults: ComputeMetrics - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -118,28 +99,28 @@ class CapelinIntegrationTest { null } - monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -148,38 +129,35 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") - - scheduler.close() - monitor.close() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -190,11 +168,6 @@ class CapelinIntegrationTest { ) } - /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - /** * Obtain the trace reader for the test. */ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 98e25be9..225200c9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.sc18 +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.service.ComputeService @@ -100,6 +101,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val compute = ComputeService( testScope.coroutineContext, clock, + MeterProvider.noop().get("opendc-compute"), NumberOfActiveServersAllocationPolicy(), ) diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index d07fe7a6..fcc78a83 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}") + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 68ea3fb9..706efdc9 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,9 +34,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId @@ -45,17 +49,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.experiments.capelin.attachMonitor -import org.opendc.experiments.capelin.createComputeService -import org.opendc.experiments.capelin.createFailureDomain +import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import kotlin.coroutines.coroutineContext import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -206,86 +207,86 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ): WebExperimentMonitor.Result { - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job])) - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) - val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - scheduler, - chan - ) - } else { - null - } + try { + runBlockingTest { + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val clock = DelayControllerClockAdapter(this) + + val chan = Channel(Channel.CONFLATED) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val metricProducer = meterProvider as MetricProducer + val meter: Meter = meterProvider.get("opendc-compute") + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) + val environment = TopologyParser(topologies, topologyId) + val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 + + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + val failureDomain = if (failureFrequency > 0) { + logger.debug { "ENABLING failures" } + createFailureDomain( + this, + clock, + seeder.nextInt(), + failureFrequency, + scheduler, + chan + ) + } else { + null + } - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" } + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } - failureDomain?.cancel() - scheduler.close() - } + failureDomain?.cancel() + } - try { - testScope.advanceUntilIdle() - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - } finally { - monitor.close() - testScope.cancel() + val monitorResults = collectMetrics(metricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } } return monitor.getResult() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index a8ac6c10..fcd43ea7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerMetrics = AggregateProvisionerMetrics( - max(event.totalVmCount, provisionerMetrics.vmTotalCount), - max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), - max(event.activeVmCount, provisionerMetrics.vmActiveCount), - max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(event.failedVmCount, provisionerMetrics.vmFailedCount), + max(totalVmCount, provisionerMetrics.vmTotalCount), + max(waitingVmCount, provisionerMetrics.vmWaitingCount), + max(activeVmCount, provisionerMetrics.vmActiveCount), + max(inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(failedVmCount, provisionerMetrics.vmFailedCount), ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 1c0f94fd..2127b066 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -108,8 +108,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine .launchIn(this) launch { - source.consume(consumer) - job.cancel() + try { + source.consume(consumer) + } finally { + job.cancel() + } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f86c4198..830ff70e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -71,6 +71,7 @@ public class SimBareMetalMachine( override fun close() { super.close() + scheduler.close() scope.cancel() } diff --git a/simulator/opendc-telemetry/build.gradle.kts b/simulator/opendc-telemetry/build.gradle.kts new file mode 100644 index 00000000..7edfd134 --- /dev/null +++ b/simulator/opendc-telemetry/build.gradle.kts @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts new file mode 100644 index 00000000..d9a4b4dd --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry API for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api("io.opentelemetry:opentelemetry-api:${versions.otelApi}") + api("io.opentelemetry:opentelemetry-api-metrics:${versions.otelApiMetrics}") +} diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts new file mode 100644 index 00000000..350a0f74 --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry SDK for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-telemetry:opendc-telemetry-api")) + api("org.jetbrains.kotlinx:kotlinx-coroutines-core") + api("io.opentelemetry:opentelemetry-sdk:${versions.otelSdk}") + api("io.opentelemetry:opentelemetry-sdk-metrics:${versions.otelSdkMetrics}") + + implementation("io.github.microutils:kotlin-logging") +} diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt new file mode 100644 index 00000000..86f6647e --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.sdk + +import io.opentelemetry.sdk.common.Clock + +/** + * An adapter class that bridges a [java.time.Clock] to a [Clock] + */ +public class OtelClockAdapter(private val clock: java.time.Clock) : Clock { + override fun now(): Long = clock.millis() + + override fun nanoTime(): Long = clock.millis() * 1_000_000L +} + +/** + * Convert the specified [java.time.Clock] to a [Clock]. + */ +public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this) diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt new file mode 100644 index 00000000..9ee16fac --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.sdk.metrics.export + +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +/** + * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every + * export interval. + * + * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param producers The metric producers to gather metrics from. + * @param exporter The export to export the metrics to. + * @param exportInterval The export interval in milliseconds. + */ +public class CoroutineMetricReader( + scope: CoroutineScope, + private val producers: List, + private val exporter: MetricExporter, + private val exportInterval: Long = 60_000 +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + private val chan = Channel>(Channel.RENDEZVOUS) + + /** + * The metric reader job. + */ + private val readerJob = scope.launch { + while (isActive) { + delay(exportInterval) + + val metrics = mutableListOf() + for (producer in producers) { + metrics.addAll(producer.collectAllMetrics()) + } + chan.send(Collections.unmodifiableList(metrics)) + } + } + + /** + * The exporter job runs in the background to actually export the metrics. + */ + private val exporterJob = chan.consumeAsFlow() + .onEach { metrics -> + suspendCoroutine { cont -> + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } + } + cont.resume(Unit) + } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + cont.resume(Unit) + } + } + } + .launchIn(scope) + + override fun close() { + readerJob.cancel() + exporterJob.cancel() + } +} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index 91b22266..5e276edf 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -85,7 +86,8 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val compute = ComputeService(testScope.coroutineContext, clock, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + val meter = MeterProvider.noop().get("opendc-compute") + val compute = ComputeService(testScope.coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) hosts.forEach { compute.addHost(it) } diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index d5603664..73b4d8e7 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -39,5 +39,7 @@ include(":opendc-simulator:opendc-simulator-resources") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-trace:opendc-trace-core") +include(":opendc-telemetry:opendc-telemetry-api") +include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-harness") include(":opendc-utils") -- cgit v1.2.3