diff options
26 files changed, 603 insertions, 801 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 65cebe1b..1a4caf91 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 9d23a5dd..0230409e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,16 +24,10 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.ReplayScheduler @@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel @@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock import kotlin.coroutines.resume @@ -66,11 +58,6 @@ import kotlin.math.max import kotlin.random.Random /** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** * Construct the failure domain for the experiments. */ fun createFailureDomain( @@ -169,85 +156,6 @@ suspend fun withComputeService( } /** - * Attach the specified monitor to the VM provisioner. - */ -suspend fun withMonitor( - monitor: ExperimentMonitor, - clock: Clock, - metricProducer: MetricProducer, - scheduler: ComputeService, - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.reportHostStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.reportHostStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), - exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ - ) - - try { - block(this) - } finally { - reader.close() - monitor.close() - } -} - -class ComputeMetrics { - var submittedVms: Int = 0 - var queuedVms: Int = 0 - var runningVms: Int = 0 - var unscheduledVms: Int = 0 - var finishedVms: Int = 0 - var hosts: Int = 0 - var availableHosts = 0 -} - -/** - * Collect the metrics of the compute service. - */ -fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - return extractComputeMetrics(metricProducer.collectAllMetrics()) -} - -/** - * Extract an [ComputeMetrics] object from the specified list of metric data. - */ -internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics { - val res = ComputeMetrics() - for (metric in metrics) { - val points = metric.longSumData.points - - if (points.isEmpty()) { - continue - } - - val value = points.first().value.toInt() - when (metric.name) { - "servers.submitted" -> res.submittedVms = value - "servers.waiting" -> res.queuedVms = value - "servers.unscheduled" -> res.unscheduledVms = value - "servers.active" -> res.runningVms = value - "servers.finished" -> res.finishedVms = value - "hosts.total" -> res.hosts = value - "hosts.available" -> res.availableHosts = value - } - } - - return res -} - -/** * Process the trace. */ suspend fun processTrace( @@ -255,7 +163,7 @@ suspend fun processTrace( reader: TraceReader<SimWorkload>, scheduler: ComputeService, chan: Channel<Unit>, - monitor: ExperimentMonitor? = null, + monitor: ComputeMonitor? = null, ) { val client = scheduler.newClient() val image = client.newImage("vm-image") @@ -289,7 +197,7 @@ suspend fun processTrace( suspendCancellableCoroutine { cont -> server.watch(object : ServerWatcher { override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.reportVmStateChange(clock.millis(), server, newState) + monitor?.onStateChange(clock.millis(), server, newState) if (newState == ServerState.TERMINATED) { cont.resume(Unit) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index ee832af8..d3bba182 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.opendc.experiments.capelin.env.ClusterEnvironmentReader +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.io.FileInputStream import java.util.* @@ -127,7 +129,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) - val monitor = ParquetExperimentMonitor( + val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 @@ -148,7 +150,7 @@ abstract class Portfolio(name: String) : Experiment(name) { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -159,9 +161,16 @@ abstract class Portfolio(name: String) : Experiment(name) { } failureDomain?.cancel() + monitor.close() } - val monitorResults = collectMetrics(meterProvider as MetricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + logger.debug { + "Finish " + + "SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.activeHostCount}" + } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index 897a6692..c5cb80e2 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry.parquet +package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.telemetry.Event import org.opendc.trace.util.parquet.LocalOutputFile import java.io.Closeable import java.io.File @@ -36,20 +36,20 @@ import java.util.concurrent.BlockingQueue import kotlin.concurrent.thread /** - * The logging instance to use. + * A writer that writes data in Parquet format. */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter<in T : Event>( +public open class ParquetDataWriter<in T>( private val path: File, private val schema: Schema, private val converter: (T, GenericData.Record) -> Unit, private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** * The writer to write the Parquet file. */ private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) @@ -57,6 +57,7 @@ public open class ParquetEventWriter<in T : Event>( .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() /** @@ -67,7 +68,7 @@ public open class ParquetEventWriter<in T : Event>( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = this.toString()) { run() } /** * Write the specified metrics to the database. @@ -100,7 +101,7 @@ public open class ParquetEventWriter<in T : Event>( is Action.Write<*> -> { val record = GenericData.Record(schema) @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) + converter(action.data as T, record) writer.write(record) } } @@ -121,6 +122,6 @@ public open class ParquetEventWriter<in T : Event>( /** * Write the specified metrics to the database. */ - public data class Write<out T : Event>(val event: T) : Action() + public data class Write<out T>(val data: T) : Action() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 7631f55f..79b84e9d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,22 +20,36 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +package org.opendc.experiments.capelin.export.parquet -import org.opendc.compute.api.Server +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File /** - * A periodic report of a virtual machine's metrics. + * A [ComputeMonitor] that logs the events to a Parquet file. */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") +public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..8912c12e --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.telemetry.compute.table.HostData +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<HostData>(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + public companion object { + private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> + record.put("host_id", data.host.name) + record.put("state", data.host.state.name) + record.put("timestamp", data.timestamp) + record.put("total_work", data.totalWork) + record.put("granted_work", data.grantedWork) + record.put("overcommitted_work", data.overcommittedWork) + record.put("interfered_work", data.interferedWork) + record.put("cpu_usage", data.cpuUsage) + record.put("cpu_demand", data.cpuDemand) + record.put("power_draw", data.powerDraw) + record.put("instance_count", data.instanceCount) + record.put("cores", data.host.model.cpuCount) + } + + private val schema: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("requested_work").type().longType().noDefault() + .name("granted_work").type().longType().noDefault() + .name("overcommitted_work").type().longType().noDefault() + .name("interfered_work").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("instance_count").type().intType().noDefault() + .name("cores").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 8feff8d9..36d630f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -20,46 +20,46 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry.parquet +package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** - * A Parquet event writer for [ProvisionerEvent]s. + * A Parquet event writer for [ServiceData]s. */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) { +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) { - override fun toString(): String = "provisioner-writer" + override fun toString(): String = "service-writer" public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) + private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> + record.put("timestamp", data.timestamp) + record.put("host_total_count", data.hostCount) + record.put("host_available_count", data.activeHostCount) + record.put("instance_total_count", data.instanceCount) + record.put("instance_active_count", data.runningInstanceCount) + record.put("instance_inactive_count", data.finishedInstanceCount) + record.put("instance_waiting_count", data.queuedInstanceCount) + record.put("instance_failed_count", data.failedInstanceCount) } private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") + .record("service") + .namespace("org.opendc.telemetry.compute") .fields() .name("timestamp").type().longType().noDefault() .name("host_total_count").type().intType().noDefault() .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() + .name("instance_total_count").type().intType().noDefault() + .name("instance_active_count").type().intType().noDefault() + .name("instance_inactive_count").type().intType().noDefault() + .name("instance_waiting_count").type().intType().noDefault() + .name("instance_failed_count").type().intType().noDefault() .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index c49499da..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) { - hostWriter.write( - HostEvent( - time, - 5 * 60 * 1000L, - host, - instanceCount, - totalWork.toLong(), - grantedWork.toLong(), - overcommittedWork.toLong(), - interferedWork.toLong(), - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - override fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerWriter.write( - ProvisionerEvent( - time, - totalHostCount, - availableHostCount, - totalVmCount, - activeVmCount, - inactiveVmCount, - waitingVmCount, - failedVmCount - ) - ) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index c8fe1cb2..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index 946410eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 24d8f768..abd8efeb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.util.* @@ -80,7 +82,6 @@ class CapelinIntegrationTest { ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> @@ -98,7 +99,7 @@ class CapelinIntegrationTest { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -111,15 +112,21 @@ class CapelinIntegrationTest { failureDomain?.cancel() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, + { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, + { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -145,7 +152,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -156,8 +163,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -189,7 +202,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -200,8 +213,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -239,7 +258,7 @@ class CapelinIntegrationTest { chan ) - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -252,8 +271,14 @@ class CapelinIntegrationTest { failureDomain.cancel() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -283,34 +308,19 @@ class CapelinIntegrationTest { return ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentMonitor { + class TestExperimentReporter : ComputeMonitor { var totalWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L var totalInterferedWork = 0L var totalPowerDraw = 0.0 - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host, - ) { - this.totalWork += totalWork.toLong() - totalGrantedWork += grantedWork.toLong() - totalOvercommittedWork += overcommittedWork.toLong() - totalInterferedWork += interferedWork.toLong() - totalPowerDraw += powerDraw + override fun record(data: HostData) { + this.totalWork += data.totalWork.toLong() + totalGrantedWork += data.grantedWork.toLong() + totalOvercommittedWork += data.overcommittedWork.toLong() + totalInterferedWork += data.interferedWork.toLong() + totalPowerDraw += data.powerDraw } - - override fun close() {} } } diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index 7d34d098..40ac2967 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.kotlin.logging) implementation(libs.config) diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index e64e20a2..02aaab3c 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.time.Clock import java.util.* @@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { ) val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) + val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { } } - val monitorResults = collectMetrics(meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) logger.debug { - "Finish SUBMIT=${monitorResults.submittedVms} " + - "FAIL=${monitorResults.unscheduledVms} " + - "QUEUE=${monitorResults.queuedVms} " + - "RUNNING=${monitorResults.runningVms}" + "Finish SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.runningInstanceCount}" } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index c29e116e..6a3de9bc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,14 +20,18 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +description = "Telemetry for OpenDC Compute" -/** - * An event that occurs within the system. - */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTelemetry.opendcTelemetrySdk) + + implementation(projects.opendcCompute.opendcComputeSimulator) + implementation(libs.opentelemetry.semconv) + implementation(libs.kotlin.logging) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 79be9ac4..95e7ff9e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -20,40 +20,40 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.monitor +package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host -import org.opendc.experiments.capelin.extractComputeMetrics +import org.opendc.telemetry.compute.table.HostData import java.time.Clock /** - * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. + * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ExperimentMetricExporter( - private val monitor: ExperimentMonitor, +public class ComputeMetricExporter( private val clock: Clock, - private val hosts: Map<String, Host> + private val hosts: Map<String, Host>, + private val monitor: ComputeMonitor ) : MetricExporter { override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { reportHostMetrics(metrics) - reportProvisionerMetrics(metrics) + reportServiceMetrics(metrics) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() } } - private var lastHostMetrics: Map<String, HostMetrics> = emptyMap() - private val hostMetricsSingleton = HostMetrics() + private var lastHostMetrics: Map<String, HBuffer> = emptyMap() + private val hostMetricsSingleton = HBuffer() private fun reportHostMetrics(metrics: Collection<MetricData>) { - val hostMetrics = mutableMapOf<String, HostMetrics>() + val hostMetrics = mutableMapOf<String, HBuffer>() for (metric in metrics) { when (metric.name) { @@ -74,69 +74,59 @@ public class ExperimentMetricExporter( val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts[id] ?: continue - monitor.reportHostData( - clock.millis(), - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.powerDraw, - hostMetric.instanceCount, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, - host + monitor.record( + HostData( + clock.millis(), + host, + hostMetric.totalWork - lastHostMetric.totalWork, + hostMetric.grantedWork - lastHostMetric.grantedWork, + hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, + hostMetric.interferedWork - lastHostMetric.interferedWork, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.instanceCount, + hostMetric.powerDraw, + hostMetric.uptime - lastHostMetric.uptime, + hostMetric.downtime - lastHostMetric.downtime, + ) ) } lastHostMetrics = hostMetrics } - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { val points = data.doubleSummaryData?.points ?: emptyList() for (point in points) { val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 block(hostMetric, avg) } } - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } block(hostMetric, point.value) } } - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { val points = data?.doubleSumData?.points ?: emptyList() for (point in points) { val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } block(hostMetric, point.value) } } - private fun reportProvisionerMetrics(metrics: Collection<MetricData>) { - val res = extractComputeMetrics(metrics) - - monitor.reportServiceData( - clock.millis(), - res.hosts, - res.availableHosts, - res.submittedVms, - res.runningVms, - res.finishedVms, - res.queuedVms, - res.unscheduledVms - ) - } - - private class HostMetrics { + /** + * A buffer for host metrics before they are reported. + */ + private class HBuffer { var totalWork: Double = 0.0 var grantedWork: Double = 0.0 var overcommittedWork: Double = 0.0 @@ -149,6 +139,10 @@ public class ExperimentMetricExporter( var downtime: Long = 0 } + private fun reportServiceMetrics(metrics: Collection<MetricData>) { + monitor.record(extractServiceMetrics(clock.millis(), metrics)) + } + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index dc28b816..ec303b37 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -20,56 +20,42 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.monitor +package org.opendc.telemetry.compute import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData /** - * A monitor watches the events of an experiment. + * A monitor that tracks the metrics and events of the OpenDC Compute service. */ -public interface ExperimentMonitor : AutoCloseable { +public interface ComputeMonitor { /** - * This method is invoked when the state of a VM changes. + * This method is invoked when the state of a [Server] changes. */ - public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} + public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {} /** - * This method is invoked when the state of a host changes. + * This method is invoked when the state of a [Host] changes. */ - public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} + public fun onStateChange(time: Long, host: Host, newState: HostState) {} /** - * This method is invoked for a host for each slice that is finishes. + * Record the specified [data]. */ - public fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) {} + public fun record(data: ServerData) {} /** - * This method is invoked for reporting service data. + * Record the specified [data]. */ - public fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) {} + public fun record(data: HostData) {} + + /** + * Record the specified [data]. + */ + public fun record(data: ServiceData) {} } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt new file mode 100644 index 00000000..d3d983b9 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import java.time.Clock + +/** + * Attach the specified monitor to the OpenDC Compute service. + */ +public suspend fun withMonitor( + scheduler: ComputeService, + clock: Clock, + metricProducer: MetricProducer, + monitor: ComputeMonitor, + exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */ + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + // Monitor host events + for (host in scheduler.hosts) { + monitor.onStateChange(clock.millis(), host, HostState.UP) + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, newState: HostState) { + monitor.onStateChange(clock.millis(), host, newState) + } + }) + } + + val reader = CoroutineMetricReader( + this, + listOf(metricProducer), + ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor), + exportInterval + ) + + try { + block(this) + } finally { + reader.close() + } +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData { + return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) +} + +/** + * Extract a [ServiceData] object from the specified list of metric data. + */ +public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData { + var submittedVms = 0 + var queuedVms = 0 + var unscheduledVms = 0 + var runningVms = 0 + var finishedVms = 0 + var hosts = 0 + var availableHosts = 0 + + for (metric in metrics) { + val points = metric.longSumData.points + + if (points.isEmpty()) { + continue + } + + val value = points.first().value.toInt() + when (metric.name) { + "servers.submitted" -> submittedVms = value + "servers.waiting" -> queuedVms = value + "servers.unscheduled" -> unscheduledVms = value + "servers.active" -> runningVms = value + "servers.finished" -> finishedVms = value + "hosts.total" -> hosts = value + "hosts.available" -> availableHosts = value + } + } + + return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index 899fc9b1..8e6c34d0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,24 +20,24 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +package org.opendc.telemetry.compute.table import org.opendc.compute.service.driver.Host /** - * A periodic report of the host machine metrics. + * A trace entry for a particular host. */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, +public data class HostData( + public val timestamp: Long, public val host: Host, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, + public val totalWork: Double, + public val grantedWork: Double, + public val overcommittedWork: Double, + public val interferedWork: Double, public val cpuUsage: Double, public val cpuDemand: Double, + public val instanceCount: Int, public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") + public val uptime: Long, + public val downtime: Long, +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 6c8fc941..2a9fa8a6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,15 +20,16 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +package org.opendc.telemetry.compute.table -import org.opendc.experiments.capelin.Portfolio +import org.opendc.compute.api.Server /** - * A periodic report of the host machine metrics. + * A trace entry for a particular server. */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") +public data class ServerData( + public val timestamp: Long, + public val server: Server, + public val uptime: Long, + public val downtime: Long, +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index 539c9bc9..f6ff5db5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,18 +20,18 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +package org.opendc.telemetry.compute.table /** - * A periodic report of the provisioner's metrics. + * A trace entry for the compute service. */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") +public data class ServiceData( + public val timestamp: Long, + public val hostCount: Int, + public val activeHostCount: Int, + public val instanceCount: Int, + public val runningInstanceCount: Int, + public val finishedInstanceCount: Int, + public val queuedInstanceCount: Int, + public val failedInstanceCount: Int +) diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index ec4a4673..99051e8e 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.kotlin.logging) implementation(libs.clikt) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 53d50357..65527141 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -47,6 +47,8 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -131,7 +133,7 @@ class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebExperimentMonitor.Result> { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> { val id = scenario.id logger.info { "Constructing performance interference model" } @@ -176,8 +178,8 @@ class RunnerCli : CliktCommand(name = "runner") { environment: EnvironmentReader, traceReader: RawParquetTraceReader, interferenceModel: VmInterferenceModel? - ): WebExperimentMonitor.Result { - val monitor = WebExperimentMonitor() + ): WebComputeMonitor.Result { + val monitor = WebComputeMonitor() try { runBlockingSimulation { @@ -220,7 +222,7 @@ class RunnerCli : CliktCommand(name = "runner") { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -233,8 +235,14 @@ class RunnerCli : CliktCommand(name = "runner") { failureDomain?.cancel() } - val monitorResults = collectMetrics(metricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + logger.debug { + "Finish " + + "SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.runningInstanceCount}" + } } } catch (cause: Throwable) { logger.warn(cause) { "Experiment failed" } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index 4044cec9..e0e3488f 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -61,14 +61,14 @@ public class ScenarioManager(private val client: ApiClient) { /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List<WebExperimentMonitor.Result>) { + public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) { client.updateJob( id, SimulationState.FINISHED, mapOf( - "total_requested_burst" to results.map { it.totalRequestedBurst }, - "total_granted_burst" to results.map { it.totalGrantedBurst }, - "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst }, - "total_interfered_burst" to results.map { it.totalInterferedBurst }, + "total_requested_burst" to results.map { it.totalWork }, + "total_granted_burst" to results.map { it.totalGrantedWork }, + "total_overcommitted_burst" to results.map { it.totalOvercommittedWork }, + "total_interfered_burst" to results.map { it.totalInterferedWork }, "mean_cpu_usage" to results.map { it.meanCpuUsage }, "mean_cpu_demand" to results.map { it.meanCpuDemand }, "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt new file mode 100644 index 00000000..c8e58dde --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import mu.KotlinLogging +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import kotlin.math.max + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebComputeMonitor : ComputeMonitor { + private val logger = KotlinLogging.logger {} + + override fun onStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } + } + + override fun record(data: HostData) { + val duration = 5 * 60 * 1000L + val slices = duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalWork + data.totalWork, + hostAggregateMetrics.totalGrantedWork + data.grantedWork, + hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, + hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, + hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, + hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, + hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + ) + + hostMetrics.compute(data.host) { _, prev -> + HostMetrics( + (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.instanceCount + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + data class AggregateHostMetrics( + val totalWork: Double = 0.0, + val totalGrantedWork: Double = 0.0, + val totalOvercommittedWork: Double = 0.0, + val totalInterferedWork: Double = 0.0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(data: ServiceData) { + serviceMetrics = AggregateServiceMetrics( + max(data.instanceCount, serviceMetrics.vmTotalCount), + max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount), + max(data.runningInstanceCount, serviceMetrics.vmActiveCount), + max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount), + max(data.failedInstanceCount, serviceMetrics.vmFailedCount), + ) + } + + public data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalWork, + hostAggregateMetrics.totalGrantedWork, + hostAggregateMetrics.totalOvercommittedWork, + hostAggregateMetrics.totalInterferedWork, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + data class Result( + val totalWork: Double, + val totalGrantedWork: Double, + val totalOvercommittedWork: Double, + val totalInterferedWork: Double, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt deleted file mode 100644 index 281c8dbb..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.telemetry.HostEvent -import kotlin.math.max - -/** - * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. - */ -public class WebExperimentMonitor : ExperimentMonitor { - private val logger = KotlinLogging.logger {} - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host, - ) { - processHostEvent( - HostEvent( - time, - 5 * 60 * 1000L, - host, - instanceCount, - totalWork.toLong(), - grantedWork.toLong(), - overcommittedWork.toLong(), - interferedWork.toLong(), - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() - - private fun processHostEvent(event: HostEvent) { - val slices = event.duration / SLICE_LENGTH - - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, - hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, - hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, - hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, - hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0 - ) - - hostMetrics.compute(event.host) { _, prev -> - HostMetrics( - (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), - event.vmCount + (prev?.vmCount ?: 0), - 1 + (prev?.count ?: 0) - ) - } - } - - private val SLICE_LENGTH: Long = 5 * 60 * 1000 - - public data class AggregateHostMetrics( - val totalRequestedBurst: Long = 0, - val totalGrantedBurst: Long = 0, - val totalOvercommittedBurst: Long = 0, - val totalInterferedBurst: Long = 0, - val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, - ) - - public data class HostMetrics( - val cpuUsage: Double, - val cpuDemand: Double, - val vmCount: Long, - val count: Long - ) - - private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - - override fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerMetrics = AggregateProvisionerMetrics( - max(totalVmCount, provisionerMetrics.vmTotalCount), - max(waitingVmCount, provisionerMetrics.vmWaitingCount), - max(activeVmCount, provisionerMetrics.vmActiveCount), - max(inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(failedVmCount, provisionerMetrics.vmFailedCount), - ) - } - - public data class AggregateProvisionerMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - - override fun close() {} - - public fun getResult(): Result { - return Result( - hostAggregateMetrics.totalRequestedBurst, - hostAggregateMetrics.totalGrantedBurst, - hostAggregateMetrics.totalOvercommittedBurst, - hostAggregateMetrics.totalInterferedBurst, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), - hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, - hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, - provisionerMetrics.vmTotalCount, - provisionerMetrics.vmWaitingCount, - provisionerMetrics.vmInactiveCount, - provisionerMetrics.vmFailedCount, - ) - } - - public data class Result( - public val totalRequestedBurst: Long, - public val totalGrantedBurst: Long, - public val totalOvercommittedBurst: Long, - public val totalInterferedBurst: Long, - public val meanCpuUsage: Double, - public val meanCpuDemand: Double, - public val meanNumDeployedImages: Double, - public val maxNumDeployedImages: Double, - public val totalPowerDraw: Double, - public val totalFailureSlices: Long, - public val totalFailureVmSlices: Long, - public val totalVmsSubmitted: Int, - public val totalVmsQueued: Int, - public val totalVmsFinished: Int, - public val totalVmsFailed: Int - ) -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5cae0a31..cee8887b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -45,6 +45,7 @@ include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") +include(":opendc-telemetry:opendc-telemetry-compute") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") |
