summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt98
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt19
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt)25
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt)46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt73
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt)44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt120
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt81
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt72
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt92
-rw-r--r--opendc-experiments/opendc-experiments-energy21/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt)26
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt)84
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt)52
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt111
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt)26
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt)19
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt (renamed from opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt)28
-rw-r--r--opendc-web/opendc-web-runner/build.gradle.kts1
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt20
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt145
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt191
-rw-r--r--settings.gradle.kts1
26 files changed, 603 insertions, 801 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 65cebe1b..1a4caf91 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -38,6 +38,7 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 9d23a5dd..0230409e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,16 +24,10 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostListener
-import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.ReplayScheduler
@@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.EnvironmentReader
-import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
@@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
import kotlin.coroutines.resume
@@ -66,11 +58,6 @@ import kotlin.math.max
import kotlin.random.Random
/**
- * The logger for this experiment.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
* Construct the failure domain for the experiments.
*/
fun createFailureDomain(
@@ -169,85 +156,6 @@ suspend fun withComputeService(
}
/**
- * Attach the specified monitor to the VM provisioner.
- */
-suspend fun withMonitor(
- monitor: ExperimentMonitor,
- clock: Clock,
- metricProducer: MetricProducer,
- scheduler: ComputeService,
- block: suspend CoroutineScope.() -> Unit
-): Unit = coroutineScope {
- // Monitor host events
- for (host in scheduler.hosts) {
- monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
- host.addListener(object : HostListener {
- override fun onStateChanged(host: Host, newState: HostState) {
- monitor.reportHostStateChange(clock.millis(), host, newState)
- }
- })
- }
-
- val reader = CoroutineMetricReader(
- this,
- listOf(metricProducer),
- ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
- exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
- )
-
- try {
- block(this)
- } finally {
- reader.close()
- monitor.close()
- }
-}
-
-class ComputeMetrics {
- var submittedVms: Int = 0
- var queuedVms: Int = 0
- var runningVms: Int = 0
- var unscheduledVms: Int = 0
- var finishedVms: Int = 0
- var hosts: Int = 0
- var availableHosts = 0
-}
-
-/**
- * Collect the metrics of the compute service.
- */
-fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- return extractComputeMetrics(metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract an [ComputeMetrics] object from the specified list of metric data.
- */
-internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
- val res = ComputeMetrics()
- for (metric in metrics) {
- val points = metric.longSumData.points
-
- if (points.isEmpty()) {
- continue
- }
-
- val value = points.first().value.toInt()
- when (metric.name) {
- "servers.submitted" -> res.submittedVms = value
- "servers.waiting" -> res.queuedVms = value
- "servers.unscheduled" -> res.unscheduledVms = value
- "servers.active" -> res.runningVms = value
- "servers.finished" -> res.finishedVms = value
- "hosts.total" -> res.hosts = value
- "hosts.available" -> res.availableHosts = value
- }
- }
-
- return res
-}
-
-/**
* Process the trace.
*/
suspend fun processTrace(
@@ -255,7 +163,7 @@ suspend fun processTrace(
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
chan: Channel<Unit>,
- monitor: ExperimentMonitor? = null,
+ monitor: ComputeMonitor? = null,
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
@@ -289,7 +197,7 @@ suspend fun processTrace(
suspendCancellableCoroutine { cont ->
server.watch(object : ServerWatcher {
override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.reportVmStateChange(clock.millis(), server, newState)
+ monitor?.onStateChange(clock.millis(), server, newState)
if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index ee832af8..d3bba182 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.io.FileInputStream
import java.util.*
@@ -127,7 +129,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
- val monitor = ParquetExperimentMonitor(
+ val monitor = ParquetExportMonitor(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
@@ -148,7 +150,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -159,9 +161,16 @@ abstract class Portfolio(name: String) : Experiment(name) {
}
failureDomain?.cancel()
+ monitor.close()
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.activeHostCount}"
+ }
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index 897a6692..c5cb80e2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.experiments.capelin.telemetry.Event
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.Closeable
import java.io.File
@@ -36,20 +36,20 @@ import java.util.concurrent.BlockingQueue
import kotlin.concurrent.thread
/**
- * The logging instance to use.
+ * A writer that writes data in Parquet format.
*/
-private val logger = KotlinLogging.logger {}
-
-/**
- * A writer that writes events in Parquet format.
- */
-public open class ParquetEventWriter<in T : Event>(
+public open class ParquetDataWriter<in T>(
private val path: File,
private val schema: Schema,
private val converter: (T, GenericData.Record) -> Unit,
private val bufferSize: Int = 4096
) : Runnable, Closeable {
/**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The writer to write the Parquet file.
*/
private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
@@ -57,6 +57,7 @@ public open class ParquetEventWriter<in T : Event>(
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
/**
@@ -67,7 +68,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = "parquet-writer") { run() }
+ private val writerThread = thread(start = false, name = this.toString()) { run() }
/**
* Write the specified metrics to the database.
@@ -100,7 +101,7 @@ public open class ParquetEventWriter<in T : Event>(
is Action.Write<*> -> {
val record = GenericData.Record(schema)
@Suppress("UNCHECKED_CAST")
- converter(action.event as T, record)
+ converter(action.data as T, record)
writer.write(record)
}
}
@@ -121,6 +122,6 @@ public open class ParquetEventWriter<in T : Event>(
/**
* Write the specified metrics to the database.
*/
- public data class Write<out T : Event>(val event: T) : Action()
+ public data class Write<out T>(val data: T) : Action()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index 7631f55f..79b84e9d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,22 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.experiments.capelin.export.parquet
-import org.opendc.compute.api.Server
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import java.io.File
/**
- * A periodic report of a virtual machine's metrics.
+ * A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public data class VmEvent(
- override val timestamp: Long,
- public val duration: Long,
- public val vm: Server,
- public val host: Server,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double
-) : Event("vm-metrics")
+public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(data: HostData) {
+ hostWriter.write(data)
+ }
+
+ override fun record(data: ServiceData) {
+ serviceWriter.write(data)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..8912c12e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.opendc.telemetry.compute.table.HostData
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostData]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+
+ override fun toString(): String = "host-writer"
+
+ public companion object {
+ private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
+ record.put("host_id", data.host.name)
+ record.put("state", data.host.state.name)
+ record.put("timestamp", data.timestamp)
+ record.put("total_work", data.totalWork)
+ record.put("granted_work", data.grantedWork)
+ record.put("overcommitted_work", data.overcommittedWork)
+ record.put("interfered_work", data.interferedWork)
+ record.put("cpu_usage", data.cpuUsage)
+ record.put("cpu_demand", data.cpuDemand)
+ record.put("power_draw", data.powerDraw)
+ record.put("instance_count", data.instanceCount)
+ record.put("cores", data.host.model.cpuCount)
+ }
+
+ private val schema: Schema = SchemaBuilder
+ .record("host")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .name("timestamp").type().longType().noDefault()
+ .name("host_id").type().stringType().noDefault()
+ .name("state").type().stringType().noDefault()
+ .name("requested_work").type().longType().noDefault()
+ .name("granted_work").type().longType().noDefault()
+ .name("overcommitted_work").type().longType().noDefault()
+ .name("interfered_work").type().longType().noDefault()
+ .name("cpu_usage").type().doubleType().noDefault()
+ .name("cpu_demand").type().doubleType().noDefault()
+ .name("power_draw").type().doubleType().noDefault()
+ .name("instance_count").type().intType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 8feff8d9..36d630f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -20,46 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry.parquet
+package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
+import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
- * A Parquet event writer for [ProvisionerEvent]s.
+ * A Parquet event writer for [ServiceData]s.
*/
-public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) {
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
- override fun toString(): String = "provisioner-writer"
+ override fun toString(): String = "service-writer"
public companion object {
- private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record ->
- record.put("timestamp", event.timestamp)
- record.put("host_total_count", event.totalHostCount)
- record.put("host_available_count", event.availableHostCount)
- record.put("vm_total_count", event.totalVmCount)
- record.put("vm_active_count", event.activeVmCount)
- record.put("vm_inactive_count", event.inactiveVmCount)
- record.put("vm_waiting_count", event.waitingVmCount)
- record.put("vm_failed_count", event.failedVmCount)
+ private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
+ record.put("timestamp", data.timestamp)
+ record.put("host_total_count", data.hostCount)
+ record.put("host_available_count", data.activeHostCount)
+ record.put("instance_total_count", data.instanceCount)
+ record.put("instance_active_count", data.runningInstanceCount)
+ record.put("instance_inactive_count", data.finishedInstanceCount)
+ record.put("instance_waiting_count", data.queuedInstanceCount)
+ record.put("instance_failed_count", data.failedInstanceCount)
}
private val schema: Schema = SchemaBuilder
- .record("provisioner_metrics")
- .namespace("org.opendc.experiments.sc20")
+ .record("service")
+ .namespace("org.opendc.telemetry.compute")
.fields()
.name("timestamp").type().longType().noDefault()
.name("host_total_count").type().intType().noDefault()
.name("host_available_count").type().intType().noDefault()
- .name("vm_total_count").type().intType().noDefault()
- .name("vm_active_count").type().intType().noDefault()
- .name("vm_inactive_count").type().intType().noDefault()
- .name("vm_waiting_count").type().intType().noDefault()
- .name("vm_failed_count").type().intType().noDefault()
+ .name("instance_total_count").type().intType().noDefault()
+ .name("instance_active_count").type().intType().noDefault()
+ .name("instance_inactive_count").type().intType().noDefault()
+ .name("instance_waiting_count").type().intType().noDefault()
+ .name("instance_failed_count").type().intType().noDefault()
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
deleted file mode 100644
index c49499da..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.monitor
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
-import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import java.io.File
-
-/**
- * The logger instance to use.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * An [ExperimentMonitor] that logs the events to a Parquet file.
- */
-public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
- private val hostWriter = ParquetHostEventWriter(
- File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
- private val provisionerWriter = ParquetProvisionerEventWriter(
- File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host
- ) {
- hostWriter.write(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerWriter.write(
- ProvisionerEvent(
- time,
- totalHostCount,
- availableHostCount,
- totalVmCount,
- activeVmCount,
- inactiveVmCount,
- waitingVmCount,
- failedVmCount
- )
- )
- }
-
- override fun close() {
- hostWriter.close()
- provisionerWriter.close()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
deleted file mode 100644
index c8fe1cb2..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [HostEvent]s.
- */
-public class ParquetHostEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "host-writer"
-
- public companion object {
- private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record ->
- // record.put("portfolio_id", event.run.parent.parent.id)
- // record.put("scenario_id", event.run.parent.id)
- // record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
- record.put("timestamp", event.timestamp)
- record.put("duration", event.duration)
- record.put("vm_count", event.vmCount)
- record.put("requested_burst", event.requestedBurst)
- record.put("granted_burst", event.grantedBurst)
- record.put("overcommissioned_burst", event.overcommissionedBurst)
- record.put("interfered_burst", event.interferedBurst)
- record.put("cpu_usage", event.cpuUsage)
- record.put("cpu_demand", event.cpuDemand)
- record.put("power_draw", event.powerDraw)
- record.put("cores", event.cores)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("host_metrics")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- // .name("portfolio_id").type().intType().noDefault()
- // .name("scenario_id").type().intType().noDefault()
- // .name("run_id").type().intType().noDefault()
- .name("timestamp").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("vm_count").type().intType().noDefault()
- .name("requested_burst").type().longType().noDefault()
- .name("granted_burst").type().longType().noDefault()
- .name("overcommissioned_burst").type().longType().noDefault()
- .name("interfered_burst").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("cores").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
deleted file mode 100644
index 946410eb..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.telemetry.parquet
-
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.opendc.experiments.capelin.telemetry.RunEvent
-import java.io.File
-
-/**
- * A Parquet event writer for [RunEvent]s.
- */
-public class ParquetRunEventWriter(path: File, bufferSize: Int) :
- ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) {
-
- override fun toString(): String = "run-writer"
-
- public companion object {
- private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record ->
- val portfolio = event.portfolio
- record.put("portfolio_name", portfolio.name)
- record.put("scenario_id", portfolio.id)
- record.put("run_id", event.repeat)
- record.put("topology", portfolio.topology.name)
- record.put("workload_name", portfolio.workload.name)
- record.put("workload_fraction", portfolio.workload.fraction)
- record.put("workload_sampler", portfolio.workload.samplingStrategy)
- record.put("allocation_policy", portfolio.allocationPolicy)
- record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency)
- record.put("interference", portfolio.operationalPhenomena.hasInterference)
- record.put("seed", event.repeat)
- }
-
- private val schema: Schema = SchemaBuilder
- .record("runs")
- .namespace("org.opendc.experiments.sc20")
- .fields()
- .name("portfolio_name").type().stringType().noDefault()
- .name("scenario_id").type().intType().noDefault()
- .name("run_id").type().intType().noDefault()
- .name("topology").type().stringType().noDefault()
- .name("workload_name").type().stringType().noDefault()
- .name("workload_fraction").type().doubleType().noDefault()
- .name("workload_sampler").type().stringType().noDefault()
- .name("allocation_policy").type().stringType().noDefault()
- .name("failure_frequency").type().doubleType().noDefault()
- .name("interference").type().booleanType().noDefault()
- .name("seed").type().intType().noDefault()
- .endRecord()
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 24d8f768..abd8efeb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
@@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.util.*
@@ -80,7 +82,6 @@ class CapelinIntegrationTest {
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
@@ -98,7 +99,7 @@ class CapelinIntegrationTest {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -111,15 +112,21 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
+ { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
+ { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
+ { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
{ assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
{ assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
{ assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
@@ -145,7 +152,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -156,8 +163,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -189,7 +202,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -200,8 +213,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -239,7 +258,7 @@ class CapelinIntegrationTest {
chan
)
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -252,8 +271,14 @@ class CapelinIntegrationTest {
failureDomain.cancel()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -283,34 +308,19 @@ class CapelinIntegrationTest {
return ClusterEnvironmentReader(stream)
}
- class TestExperimentReporter : ExperimentMonitor {
+ class TestExperimentReporter : ComputeMonitor {
var totalWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
var totalInterferedWork = 0L
var totalPowerDraw = 0.0
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host,
- ) {
- this.totalWork += totalWork.toLong()
- totalGrantedWork += grantedWork.toLong()
- totalOvercommittedWork += overcommittedWork.toLong()
- totalInterferedWork += interferedWork.toLong()
- totalPowerDraw += powerDraw
+ override fun record(data: HostData) {
+ this.totalWork += data.totalWork.toLong()
+ totalGrantedWork += data.grantedWork.toLong()
+ totalOvercommittedWork += data.overcommittedWork.toLong()
+ totalInterferedWork += data.interferedWork.toLong()
+ totalPowerDraw += data.powerDraw
}
-
- override fun close() {}
}
}
diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
index 7d34d098..40ac2967 100644
--- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.kotlin.logging)
implementation(libs.config)
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index e64e20a2..02aaab3c 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
-import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
+import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.time.Clock
import java.util.*
@@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
)
val meterProvider: MeterProvider = createMeterProvider(clock)
- val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
+ val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096)
val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace))
withComputeService(clock, meterProvider, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
}
}
- val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
logger.debug {
- "Finish SUBMIT=${monitorResults.submittedVms} " +
- "FAIL=${monitorResults.unscheduledVms} " +
- "QUEUE=${monitorResults.queuedVms} " +
- "RUNNING=${monitorResults.runningVms}"
+ "Finish SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index c29e116e..6a3de9bc 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,14 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+description = "Telemetry for OpenDC Compute"
-/**
- * An event that occurs within the system.
- */
-public abstract class Event(public val name: String) {
- /**
- * The time of occurrence of this event.
- */
- public abstract val timestamp: Long
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTelemetry.opendcTelemetrySdk)
+
+ implementation(projects.opendcCompute.opendcComputeSimulator)
+ implementation(libs.opentelemetry.semconv)
+ implementation(libs.kotlin.logging)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 79be9ac4..95e7ff9e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -20,40 +20,40 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.monitor
+package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
-import org.opendc.experiments.capelin.extractComputeMetrics
+import org.opendc.telemetry.compute.table.HostData
import java.time.Clock
/**
- * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ExperimentMetricExporter(
- private val monitor: ExperimentMonitor,
+public class ComputeMetricExporter(
private val clock: Clock,
- private val hosts: Map<String, Host>
+ private val hosts: Map<String, Host>,
+ private val monitor: ComputeMonitor
) : MetricExporter {
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
reportHostMetrics(metrics)
- reportProvisionerMetrics(metrics)
+ reportServiceMetrics(metrics)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
}
}
- private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
- private val hostMetricsSingleton = HostMetrics()
+ private var lastHostMetrics: Map<String, HBuffer> = emptyMap()
+ private val hostMetricsSingleton = HBuffer()
private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hostMetrics = mutableMapOf<String, HostMetrics>()
+ val hostMetrics = mutableMapOf<String, HBuffer>()
for (metric in metrics) {
when (metric.name) {
@@ -74,69 +74,59 @@ public class ExperimentMetricExporter(
val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
val host = hosts[id] ?: continue
- monitor.reportHostData(
- clock.millis(),
- hostMetric.totalWork - lastHostMetric.totalWork,
- hostMetric.grantedWork - lastHostMetric.grantedWork,
- hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
- hostMetric.interferedWork - lastHostMetric.interferedWork,
- hostMetric.cpuUsage,
- hostMetric.cpuDemand,
- hostMetric.powerDraw,
- hostMetric.instanceCount,
- hostMetric.uptime - lastHostMetric.uptime,
- hostMetric.downtime - lastHostMetric.downtime,
- host
+ monitor.record(
+ HostData(
+ clock.millis(),
+ host,
+ hostMetric.totalWork - lastHostMetric.totalWork,
+ hostMetric.grantedWork - lastHostMetric.grantedWork,
+ hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
+ hostMetric.interferedWork - lastHostMetric.interferedWork,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.instanceCount,
+ hostMetric.powerDraw,
+ hostMetric.uptime - lastHostMetric.uptime,
+ hostMetric.downtime - lastHostMetric.downtime,
+ )
)
}
lastHostMetrics = hostMetrics
}
- private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
val points = data.doubleSummaryData?.points ?: emptyList()
for (point in points) {
val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2
block(hostMetric, avg)
}
}
- private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) {
val points = data?.longSumData?.points ?: emptyList()
for (point in points) {
val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
block(hostMetric, point.value)
}
}
- private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) {
val points = data?.doubleSumData?.points ?: emptyList()
for (point in points) {
val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() }
+ val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() }
block(hostMetric, point.value)
}
}
- private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
- val res = extractComputeMetrics(metrics)
-
- monitor.reportServiceData(
- clock.millis(),
- res.hosts,
- res.availableHosts,
- res.submittedVms,
- res.runningVms,
- res.finishedVms,
- res.queuedVms,
- res.unscheduledVms
- )
- }
-
- private class HostMetrics {
+ /**
+ * A buffer for host metrics before they are reported.
+ */
+ private class HBuffer {
var totalWork: Double = 0.0
var grantedWork: Double = 0.0
var overcommittedWork: Double = 0.0
@@ -149,6 +139,10 @@ public class ExperimentMetricExporter(
var downtime: Long = 0
}
+ private fun reportServiceMetrics(metrics: Collection<MetricData>) {
+ monitor.record(extractServiceMetrics(clock.millis(), metrics))
+ }
+
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index dc28b816..ec303b37 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -20,56 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.monitor
+package org.opendc.telemetry.compute
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServiceData
/**
- * A monitor watches the events of an experiment.
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
*/
-public interface ExperimentMonitor : AutoCloseable {
+public interface ComputeMonitor {
/**
- * This method is invoked when the state of a VM changes.
+ * This method is invoked when the state of a [Server] changes.
*/
- public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+ public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {}
/**
- * This method is invoked when the state of a host changes.
+ * This method is invoked when the state of a [Host] changes.
*/
- public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
+ public fun onStateChange(time: Long, host: Host, newState: HostState) {}
/**
- * This method is invoked for a host for each slice that is finishes.
+ * Record the specified [data].
*/
- public fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host
- ) {}
+ public fun record(data: ServerData) {}
/**
- * This method is invoked for reporting service data.
+ * Record the specified [data].
*/
- public fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {}
+ public fun record(data: HostData) {}
+
+ /**
+ * Record the specified [data].
+ */
+ public fun record(data: ServiceData) {}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
new file mode 100644
index 00000000..d3d983b9
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import java.time.Clock
+
+/**
+ * Attach the specified monitor to the OpenDC Compute service.
+ */
+public suspend fun withMonitor(
+ scheduler: ComputeService,
+ clock: Clock,
+ metricProducer: MetricProducer,
+ monitor: ComputeMonitor,
+ exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ // Monitor host events
+ for (host in scheduler.hosts) {
+ monitor.onStateChange(clock.millis(), host, HostState.UP)
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, newState: HostState) {
+ monitor.onStateChange(clock.millis(), host, newState)
+ }
+ })
+ }
+
+ val reader = CoroutineMetricReader(
+ this,
+ listOf(metricProducer),
+ ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor),
+ exportInterval
+ )
+
+ try {
+ block(this)
+ } finally {
+ reader.close()
+ }
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+ return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
+}
+
+/**
+ * Extract a [ServiceData] object from the specified list of metric data.
+ */
+public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
+ var submittedVms = 0
+ var queuedVms = 0
+ var unscheduledVms = 0
+ var runningVms = 0
+ var finishedVms = 0
+ var hosts = 0
+ var availableHosts = 0
+
+ for (metric in metrics) {
+ val points = metric.longSumData.points
+
+ if (points.isEmpty()) {
+ continue
+ }
+
+ val value = points.first().value.toInt()
+ when (metric.name) {
+ "servers.submitted" -> submittedVms = value
+ "servers.waiting" -> queuedVms = value
+ "servers.unscheduled" -> unscheduledVms = value
+ "servers.active" -> runningVms = value
+ "servers.finished" -> finishedVms = value
+ "hosts.total" -> hosts = value
+ "hosts.available" -> availableHosts = value
+ }
+ }
+
+ return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index 899fc9b1..8e6c34d0 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,24 +20,24 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
import org.opendc.compute.service.driver.Host
/**
- * A periodic report of the host machine metrics.
+ * A trace entry for a particular host.
*/
-public data class HostEvent(
- override val timestamp: Long,
- public val duration: Long,
+public data class HostData(
+ public val timestamp: Long,
public val host: Host,
- public val vmCount: Int,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
+ public val totalWork: Double,
+ public val grantedWork: Double,
+ public val overcommittedWork: Double,
+ public val interferedWork: Double,
public val cpuUsage: Double,
public val cpuDemand: Double,
+ public val instanceCount: Int,
public val powerDraw: Double,
- public val cores: Int
-) : Event("host-metrics")
+ public val uptime: Long,
+ public val downtime: Long,
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 6c8fc941..2a9fa8a6 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,15 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
-import org.opendc.experiments.capelin.Portfolio
+import org.opendc.compute.api.Server
/**
- * A periodic report of the host machine metrics.
+ * A trace entry for a particular server.
*/
-public data class RunEvent(
- val portfolio: Portfolio,
- val repeat: Int,
- override val timestamp: Long
-) : Event("run")
+public data class ServerData(
+ public val timestamp: Long,
+ public val server: Server,
+ public val uptime: Long,
+ public val downtime: Long,
+)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index 539c9bc9..f6ff5db5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,18 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.telemetry
+package org.opendc.telemetry.compute.table
/**
- * A periodic report of the provisioner's metrics.
+ * A trace entry for the compute service.
*/
-public data class ProvisionerEvent(
- override val timestamp: Long,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
-) : Event("provisioner-metrics")
+public data class ServiceData(
+ public val timestamp: Long,
+ public val hostCount: Int,
+ public val activeHostCount: Int,
+ public val instanceCount: Int,
+ public val runningInstanceCount: Int,
+ public val finishedInstanceCount: Int,
+ public val queuedInstanceCount: Int,
+ public val failedInstanceCount: Int
+)
diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts
index ec4a4673..99051e8e 100644
--- a/opendc-web/opendc-web-runner/build.gradle.kts
+++ b/opendc-web/opendc-web-runner/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
implementation(projects.opendcExperiments.opendcExperimentsCapelin)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcTelemetry.opendcTelemetryCompute)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 53d50357..65527141 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -47,6 +47,8 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -131,7 +133,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebExperimentMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -176,8 +178,8 @@ class RunnerCli : CliktCommand(name = "runner") {
environment: EnvironmentReader,
traceReader: RawParquetTraceReader,
interferenceModel: VmInterferenceModel?
- ): WebExperimentMonitor.Result {
- val monitor = WebExperimentMonitor()
+ ): WebComputeMonitor.Result {
+ val monitor = WebComputeMonitor()
try {
runBlockingSimulation {
@@ -220,7 +222,7 @@ class RunnerCli : CliktCommand(name = "runner") {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -233,8 +235,14 @@ class RunnerCli : CliktCommand(name = "runner") {
failureDomain?.cancel()
}
- val monitorResults = collectMetrics(metricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
+ }
}
} catch (cause: Throwable) {
logger.warn(cause) { "Experiment failed" }
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index 4044cec9..e0e3488f 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -61,14 +61,14 @@ public class ScenarioManager(private val client: ApiClient) {
/**
* Persist the specified results.
*/
- public suspend fun finish(id: String, results: List<WebExperimentMonitor.Result>) {
+ public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
- "total_requested_burst" to results.map { it.totalRequestedBurst },
- "total_granted_burst" to results.map { it.totalGrantedBurst },
- "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst },
- "total_interfered_burst" to results.map { it.totalInterferedBurst },
+ "total_requested_burst" to results.map { it.totalWork },
+ "total_granted_burst" to results.map { it.totalGrantedWork },
+ "total_overcommitted_burst" to results.map { it.totalOvercommittedWork },
+ "total_interfered_burst" to results.map { it.totalInterferedWork },
"mean_cpu_usage" to results.map { it.meanCpuUsage },
"mean_cpu_demand" to results.map { it.meanCpuDemand },
"mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
new file mode 100644
index 00000000..c8e58dde
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import mu.KotlinLogging
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import kotlin.math.max
+
+/**
+ * A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebComputeMonitor : ComputeMonitor {
+ private val logger = KotlinLogging.logger {}
+
+ override fun onStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun record(data: HostData) {
+ val duration = 5 * 60 * 1000L
+ val slices = duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalWork + data.totalWork,
+ hostAggregateMetrics.totalGrantedWork + data.grantedWork,
+ hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
+ hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ )
+
+ hostMetrics.compute(data.host) { _, prev ->
+ HostMetrics(
+ (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.instanceCount + (prev?.instanceCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ data class AggregateHostMetrics(
+ val totalWork: Double = 0.0,
+ val totalGrantedWork: Double = 0.0,
+ val totalOvercommittedWork: Double = 0.0,
+ val totalInterferedWork: Double = 0.0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val instanceCount: Long,
+ val count: Long
+ )
+
+ private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
+
+ override fun record(data: ServiceData) {
+ serviceMetrics = AggregateServiceMetrics(
+ max(data.instanceCount, serviceMetrics.vmTotalCount),
+ max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount),
+ max(data.runningInstanceCount, serviceMetrics.vmActiveCount),
+ max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount),
+ max(data.failedInstanceCount, serviceMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateServiceMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalWork,
+ hostAggregateMetrics.totalGrantedWork,
+ hostAggregateMetrics.totalOvercommittedWork,
+ hostAggregateMetrics.totalInterferedWork,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ serviceMetrics.vmTotalCount,
+ serviceMetrics.vmWaitingCount,
+ serviceMetrics.vmInactiveCount,
+ serviceMetrics.vmFailedCount,
+ )
+ }
+
+ data class Result(
+ val totalWork: Double,
+ val totalGrantedWork: Double,
+ val totalOvercommittedWork: Double,
+ val totalInterferedWork: Double,
+ val meanCpuUsage: Double,
+ val meanCpuDemand: Double,
+ val meanNumDeployedImages: Double,
+ val maxNumDeployedImages: Double,
+ val totalPowerDraw: Double,
+ val totalFailureSlices: Long,
+ val totalFailureVmSlices: Long,
+ val totalVmsSubmitted: Int,
+ val totalVmsQueued: Int,
+ val totalVmsFinished: Int,
+ val totalVmsFailed: Int
+ )
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
deleted file mode 100644
index 281c8dbb..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import kotlin.math.max
-
-/**
- * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
- */
-public class WebExperimentMonitor : ExperimentMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host,
- ) {
- processHostEvent(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
-
- private fun processHostEvent(event: HostEvent) {
- val slices = event.duration / SLICE_LENGTH
-
- hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
- hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
- hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
- hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
- )
-
- hostMetrics.compute(event.host) { _, prev ->
- HostMetrics(
- (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
- event.vmCount + (prev?.vmCount ?: 0),
- 1 + (prev?.count ?: 0)
- )
- }
- }
-
- private val SLICE_LENGTH: Long = 5 * 60 * 1000
-
- public data class AggregateHostMetrics(
- val totalRequestedBurst: Long = 0,
- val totalGrantedBurst: Long = 0,
- val totalOvercommittedBurst: Long = 0,
- val totalInterferedBurst: Long = 0,
- val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
- )
-
- public data class HostMetrics(
- val cpuUsage: Double,
- val cpuDemand: Double,
- val vmCount: Long,
- val count: Long
- )
-
- private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerMetrics = AggregateProvisionerMetrics(
- max(totalVmCount, provisionerMetrics.vmTotalCount),
- max(waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(activeVmCount, provisionerMetrics.vmActiveCount),
- max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(failedVmCount, provisionerMetrics.vmFailedCount),
- )
- }
-
- public data class AggregateProvisionerMetrics(
- val vmTotalCount: Int = 0,
- val vmWaitingCount: Int = 0,
- val vmActiveCount: Int = 0,
- val vmInactiveCount: Int = 0,
- val vmFailedCount: Int = 0
- )
-
- override fun close() {}
-
- public fun getResult(): Result {
- return Result(
- hostAggregateMetrics.totalRequestedBurst,
- hostAggregateMetrics.totalGrantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst,
- hostAggregateMetrics.totalInterferedBurst,
- hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
- hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
- hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
- provisionerMetrics.vmTotalCount,
- provisionerMetrics.vmWaitingCount,
- provisionerMetrics.vmInactiveCount,
- provisionerMetrics.vmFailedCount,
- )
- }
-
- public data class Result(
- public val totalRequestedBurst: Long,
- public val totalGrantedBurst: Long,
- public val totalOvercommittedBurst: Long,
- public val totalInterferedBurst: Long,
- public val meanCpuUsage: Double,
- public val meanCpuDemand: Double,
- public val meanNumDeployedImages: Double,
- public val maxNumDeployedImages: Double,
- public val totalPowerDraw: Double,
- public val totalFailureSlices: Long,
- public val totalFailureVmSlices: Long,
- public val totalVmsSubmitted: Int,
- public val totalVmsQueued: Int,
- public val totalVmsFinished: Int,
- public val totalVmsFailed: Int
- )
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 5cae0a31..cee8887b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -45,6 +45,7 @@ include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-telemetry:opendc-telemetry-api")
include(":opendc-telemetry:opendc-telemetry-sdk")
+include(":opendc-telemetry:opendc-telemetry-compute")
include(":opendc-trace:opendc-trace-api")
include(":opendc-trace:opendc-trace-gwf")
include(":opendc-trace:opendc-trace-swf")