summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:48:51 +0100
committerGitHub <noreply@github.com>2021-03-27 12:48:51 +0100
commitbef2b2fc9ab97941613ec4537ebca1eb3fccdee6 (patch)
tree4a4c1e0fb2de6aa0a0b9810f9c255e039a7ce47b /simulator/opendc-experiments/opendc-experiments-capelin
parent57ebc8a1c6779d7e7276754838e83f1c026cceb9 (diff)
parent5b0eaf76ec00192c755b268b7655f6463c5bc62f (diff)
Integrate OpenTelemetry into OpenDC
This pull request resolves issue #91 by adopting the industry-standard OpenTelemetry standard for collecting traces and metrics in OpenDC simulations: * Metrics will now be exposed through OpenTelemetry's metrics API * `opendc-telemetry` provides complementary code to support gathering telemetry in OpenDC. **Breaking API Changes** * `opendc-tracer` has been removed. * `EventFlow` and all usages of it have been removed. * `opendc-experiments-sc18` has been removed for now, but a suitable replacement will follow soon.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt179
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt69
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt171
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt15
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt26
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt136
7 files changed, 373 insertions, 225 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 2d0da1bf..b2d7cc30 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -47,4 +47,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 44436019..40f50235 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,26 +22,19 @@
package org.opendc.experiments.capelin
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -51,9 +44,11 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Clock
+import kotlin.coroutines.coroutineContext
+import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -136,13 +131,13 @@ public fun createTraceReader(
/**
* Construct the environment for a simulated compute service..
*/
-public fun createComputeService(
- coroutineScope: CoroutineScope,
+public suspend fun withComputeService(
clock: Clock,
+ meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
- eventTracer: EventTracer
-): ComputeServiceImpl {
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -151,33 +146,43 @@ public fun createComputeService(
def.name,
def.model,
def.meta,
- coroutineScope.coroutineContext,
+ coroutineContext,
clock,
+ meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
def.powerModel
)
}
+ val schedulerMeter = meterProvider.get("opendc-compute")
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+ ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
}
- return scheduler
+ try {
+ block(this, scheduler)
+ } finally {
+ scheduler.close()
+ hosts.forEach(SimHost::close)
+ }
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun attachMonitor(
- coroutineScope: CoroutineScope,
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
clock: Clock,
+ metricProducer: MetricProducer,
scheduler: ComputeService,
- monitor: ExperimentMonitor
-) {
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -186,45 +191,55 @@ public fun attachMonitor(
monitor.reportHostStateChange(clock.millis(), host, newState)
}
})
+ }
- host.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> monitor.reportHostSlice(
- clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.driver
- )
- }
- }
- .launchIn(coroutineScope)
+ val reader = CoroutineMetricReader(
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
+ )
- (host as SimHost).machine.powerDraw
- .onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(coroutineScope)
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
}
+}
- scheduler.events
- .onEach { event ->
- when (event) {
- is ComputeServiceEvent.MetricsAvailable ->
- monitor.reportProvisionerMetrics(clock.millis(), event)
- }
- }
- .launchIn(coroutineScope)
+public class ComputeMetrics {
+ public var submittedVms: Int = 0
+ public var queuedVms: Int = 0
+ public var runningVms: Int = 0
+ public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
}
/**
* Process the trace.
*/
public suspend fun processTrace(
- coroutineScope: CoroutineScope,
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
@@ -233,44 +248,46 @@ public suspend fun processTrace(
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
try {
- var submitted = 0
+ coroutineScope {
+ while (reader.hasNext()) {
+ val entry = reader.next()
- while (reader.hasNext()) {
- val entry = reader.next()
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
- submitted++
- delay(max(0, entry.start - clock.millis()))
- coroutineScope.launch {
- chan.send(Unit)
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
+ delay(max(0, (entry.start - offset) - clock.millis()))
+ launch {
+ chan.send(Unit)
+ val server = client.newServer(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta
- )
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta
+ )
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
+ suspendCancellableCoroutine { cont ->
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
+
+ if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ cont.resume(Unit)
+ }
+ }
+ })
}
- })
+ }
}
}
- scheduler.events
- .takeWhile {
- when (it) {
- is ComputeServiceEvent.MetricsAvailable ->
- it.inactiveVmCount + it.failedVmCount != submitted
- }
- }
- .collect()
- delay(1)
+ yield()
} finally {
reader.close()
client.close()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index f9c96bb6..5fa77161 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -22,19 +22,15 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
-import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.compute.simulator.allocation.*
+import org.opendc.compute.service.scheduler.*
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -47,7 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
@@ -117,16 +113,19 @@ public abstract class Portfolio(name: String) : Experiment(name) {
* Perform a single trial for this portfolio.
*/
@OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val tracer = EventTracer(clock)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seeder = Random(repeat)
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createAllocationPolicy(seeder)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -152,15 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy,
- tracer
- )
-
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
@@ -175,31 +166,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
}
- try {
- testScope.advanceUntilIdle()
- } finally {
- monitor.close()
- }
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..799de60f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+
+ monitor.reportPowerConsumption(host, hostMetric.powerDraw)
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.sum)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 14cc06dc..5e75c890 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,15 +24,13 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
-import java.io.Closeable
/**
* A monitor watches the events of an experiment.
*/
-public interface ExperimentMonitor : Closeable {
+public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked when the state of a VM changes.
*/
@@ -68,5 +66,14 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index c9d57a98..0e675d87 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerWriter.write(
ProvisionerEvent(
time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
)
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a812490a..02cfdc06 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,19 +22,18 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.AfterEach
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -45,9 +44,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
@@ -55,16 +53,6 @@ import java.time.Clock
@OptIn(ExperimentalCoroutinesApi::class)
class CapelinIntegrationTest {
/**
- * The [TestCoroutineScope] to use.
- */
- private lateinit var testScope: TestCoroutineScope
-
- /**
- * The simulation clock to use.
- */
- private lateinit var clock: Clock
-
- /**
* The monitor used to keep track of the metrics.
*/
private lateinit var monitor: TestExperimentReporter
@@ -74,38 +62,26 @@ class CapelinIntegrationTest {
*/
@BeforeEach
fun setUp() {
- testScope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(testScope)
-
monitor = TestExperimentReporter()
}
- /**
- * Tear down the experimental environment.
- */
- @AfterEach
- fun tearDown() = testScope.cleanupTestCoroutines()
-
@Test
- fun testLarge() {
+ fun testLarge() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: ComputeServiceImpl
- val tracer = EventTracer(clock)
-
- testScope.launch {
- scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy,
- tracer
- )
+ lateinit var monitorResults: ComputeMetrics
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -120,29 +96,28 @@ class CapelinIntegrationTest {
null
}
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
- monitor.close()
}
- runSimulation()
+ monitorResults = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
+ { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
{ assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
{ assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
@@ -151,41 +126,33 @@ class CapelinIntegrationTest {
}
@Test
- fun testSmall() {
+ fun testSmall() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- val tracer = EventTracer(clock)
-
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy,
- tracer
- )
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- yield()
-
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
-
- scheduler.close()
- monitor.close()
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
}
- runSimulation()
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
// Note that these values have been verified beforehand
assertAll(
@@ -197,11 +164,6 @@ class CapelinIntegrationTest {
}
/**
- * Run the simulation.
- */
- private fun runSimulation() = testScope.advanceUntilIdle()
-
- /**
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {