diff options
Diffstat (limited to 'opendc-experiments')
19 files changed, 218 insertions, 853 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 65cebe1b..1a4caf91 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 9d23a5dd..0230409e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,16 +24,10 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.ReplayScheduler @@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel @@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock import kotlin.coroutines.resume @@ -66,11 +58,6 @@ import kotlin.math.max import kotlin.random.Random /** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** * Construct the failure domain for the experiments. */ fun createFailureDomain( @@ -169,85 +156,6 @@ suspend fun withComputeService( } /** - * Attach the specified monitor to the VM provisioner. - */ -suspend fun withMonitor( - monitor: ExperimentMonitor, - clock: Clock, - metricProducer: MetricProducer, - scheduler: ComputeService, - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.reportHostStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.reportHostStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), - exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ - ) - - try { - block(this) - } finally { - reader.close() - monitor.close() - } -} - -class ComputeMetrics { - var submittedVms: Int = 0 - var queuedVms: Int = 0 - var runningVms: Int = 0 - var unscheduledVms: Int = 0 - var finishedVms: Int = 0 - var hosts: Int = 0 - var availableHosts = 0 -} - -/** - * Collect the metrics of the compute service. - */ -fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - return extractComputeMetrics(metricProducer.collectAllMetrics()) -} - -/** - * Extract an [ComputeMetrics] object from the specified list of metric data. - */ -internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics { - val res = ComputeMetrics() - for (metric in metrics) { - val points = metric.longSumData.points - - if (points.isEmpty()) { - continue - } - - val value = points.first().value.toInt() - when (metric.name) { - "servers.submitted" -> res.submittedVms = value - "servers.waiting" -> res.queuedVms = value - "servers.unscheduled" -> res.unscheduledVms = value - "servers.active" -> res.runningVms = value - "servers.finished" -> res.finishedVms = value - "hosts.total" -> res.hosts = value - "hosts.available" -> res.availableHosts = value - } - } - - return res -} - -/** * Process the trace. */ suspend fun processTrace( @@ -255,7 +163,7 @@ suspend fun processTrace( reader: TraceReader<SimWorkload>, scheduler: ComputeService, chan: Channel<Unit>, - monitor: ExperimentMonitor? = null, + monitor: ComputeMonitor? = null, ) { val client = scheduler.newClient() val image = client.newImage("vm-image") @@ -289,7 +197,7 @@ suspend fun processTrace( suspendCancellableCoroutine { cont -> server.watch(object : ServerWatcher { override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.reportVmStateChange(clock.millis(), server, newState) + monitor?.onStateChange(clock.millis(), server, newState) if (newState == ServerState.TERMINATED) { cont.resume(Unit) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index ee832af8..d3bba182 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.opendc.experiments.capelin.env.ClusterEnvironmentReader +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.io.FileInputStream import java.util.* @@ -127,7 +129,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) - val monitor = ParquetExperimentMonitor( + val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 @@ -148,7 +150,7 @@ abstract class Portfolio(name: String) : Experiment(name) { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -159,9 +161,16 @@ abstract class Portfolio(name: String) : Experiment(name) { } failureDomain?.cancel() + monitor.close() } - val monitorResults = collectMetrics(meterProvider as MetricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + logger.debug { + "Finish " + + "SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.activeHostCount}" + } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index 897a6692..c5cb80e2 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry.parquet +package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.telemetry.Event import org.opendc.trace.util.parquet.LocalOutputFile import java.io.Closeable import java.io.File @@ -36,20 +36,20 @@ import java.util.concurrent.BlockingQueue import kotlin.concurrent.thread /** - * The logging instance to use. + * A writer that writes data in Parquet format. */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter<in T : Event>( +public open class ParquetDataWriter<in T>( private val path: File, private val schema: Schema, private val converter: (T, GenericData.Record) -> Unit, private val bufferSize: Int = 4096 ) : Runnable, Closeable { /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** * The writer to write the Parquet file. */ private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) @@ -57,6 +57,7 @@ public open class ParquetEventWriter<in T : Event>( .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() /** @@ -67,7 +68,7 @@ public open class ParquetEventWriter<in T : Event>( /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } + private val writerThread = thread(start = false, name = this.toString()) { run() } /** * Write the specified metrics to the database. @@ -100,7 +101,7 @@ public open class ParquetEventWriter<in T : Event>( is Action.Write<*> -> { val record = GenericData.Record(schema) @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) + converter(action.data as T, record) writer.write(record) } } @@ -121,6 +122,6 @@ public open class ParquetEventWriter<in T : Event>( /** * Write the specified metrics to the database. */ - public data class Write<out T : Event>(val event: T) : Action() + public data class Write<out T>(val data: T) : Action() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index c29e116e..79b84e9d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,14 +20,36 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry +package org.opendc.experiments.capelin.export.parquet + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File /** - * An event that occurs within the system. + * A [ComputeMonitor] that logs the events to a Parquet file. */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long +public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..8912c12e --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.telemetry.compute.table.HostData +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<HostData>(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + public companion object { + private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> + record.put("host_id", data.host.name) + record.put("state", data.host.state.name) + record.put("timestamp", data.timestamp) + record.put("total_work", data.totalWork) + record.put("granted_work", data.grantedWork) + record.put("overcommitted_work", data.overcommittedWork) + record.put("interfered_work", data.interferedWork) + record.put("cpu_usage", data.cpuUsage) + record.put("cpu_demand", data.cpuDemand) + record.put("power_draw", data.powerDraw) + record.put("instance_count", data.instanceCount) + record.put("cores", data.host.model.cpuCount) + } + + private val schema: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("requested_work").type().longType().noDefault() + .name("granted_work").type().longType().noDefault() + .name("overcommitted_work").type().longType().noDefault() + .name("interfered_work").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("instance_count").type().intType().noDefault() + .name("cores").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 8feff8d9..36d630f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -20,46 +20,46 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.telemetry.parquet +package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent +import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** - * A Parquet event writer for [ProvisionerEvent]s. + * A Parquet event writer for [ServiceData]s. */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) { +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) { - override fun toString(): String = "provisioner-writer" + override fun toString(): String = "service-writer" public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) + private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> + record.put("timestamp", data.timestamp) + record.put("host_total_count", data.hostCount) + record.put("host_available_count", data.activeHostCount) + record.put("instance_total_count", data.instanceCount) + record.put("instance_active_count", data.runningInstanceCount) + record.put("instance_inactive_count", data.finishedInstanceCount) + record.put("instance_waiting_count", data.queuedInstanceCount) + record.put("instance_failed_count", data.failedInstanceCount) } private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") + .record("service") + .namespace("org.opendc.telemetry.compute") .fields() .name("timestamp").type().longType().noDefault() .name("host_total_count").type().intType().noDefault() .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() + .name("instance_total_count").type().intType().noDefault() + .name("instance_active_count").type().intType().noDefault() + .name("instance_inactive_count").type().intType().noDefault() + .name("instance_waiting_count").type().intType().noDefault() + .name("instance_failed_count").type().intType().noDefault() .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt deleted file mode 100644 index 79be9ac4..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.driver.Host -import org.opendc.experiments.capelin.extractComputeMetrics -import java.time.Clock - -/** - * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. - */ -public class ExperimentMetricExporter( - private val monitor: ExperimentMonitor, - private val clock: Clock, - private val hosts: Map<String, Host> -) : MetricExporter { - - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - return try { - reportHostMetrics(metrics) - reportProvisionerMetrics(metrics) - CompletableResultCode.ofSuccess() - } catch (e: Throwable) { - CompletableResultCode.ofFailure() - } - } - - private var lastHostMetrics: Map<String, HostMetrics> = emptyMap() - private val hostMetricsSingleton = HostMetrics() - - private fun reportHostMetrics(metrics: Collection<MetricData>) { - val hostMetrics = mutableMapOf<String, HostMetrics>() - - for (metric in metrics) { - when (metric.name) { - "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } - "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v } - "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } - "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } - "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } - "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } - "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } - "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } - "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } - } - } - - for ((id, hostMetric) in hostMetrics) { - val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts[id] ?: continue - - monitor.reportHostData( - clock.millis(), - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.powerDraw, - hostMetric.instanceCount, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, - host - ) - } - - lastHostMetrics = hostMetrics - } - - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) - } - } - - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - block(hostMetric, point.value) - } - } - - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - block(hostMetric, point.value) - } - } - - private fun reportProvisionerMetrics(metrics: Collection<MetricData>) { - val res = extractComputeMetrics(metrics) - - monitor.reportServiceData( - clock.millis(), - res.hosts, - res.availableHosts, - res.submittedVms, - res.runningVms, - res.finishedVms, - res.queuedVms, - res.unscheduledVms - ) - } - - private class HostMetrics { - var totalWork: Double = 0.0 - var grantedWork: Double = 0.0 - var overcommittedWork: Double = 0.0 - var interferedWork: Double = 0.0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var instanceCount: Int = 0 - var powerDraw: Double = 0.0 - var uptime: Long = 0 - var downtime: Long = 0 - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt deleted file mode 100644 index dc28b816..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState - -/** - * A monitor watches the events of an experiment. - */ -public interface ExperimentMonitor : AutoCloseable { - /** - * This method is invoked when the state of a VM changes. - */ - public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a host changes. - */ - public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - public fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) {} - - /** - * This method is invoked for reporting service data. - */ - public fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index c49499da..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) { - hostWriter.write( - HostEvent( - time, - 5 * 60 * 1000L, - host, - instanceCount, - totalWork.toLong(), - grantedWork.toLong(), - overcommittedWork.toLong(), - interferedWork.toLong(), - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - override fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerWriter.write( - ProvisionerEvent( - time, - totalHostCount, - availableHostCount, - totalVmCount, - activeVmCount, - inactiveVmCount, - waitingVmCount, - failedVmCount - ) - ) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt deleted file mode 100644 index 899fc9b1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.service.driver.Host - -/** - * A periodic report of the host machine metrics. - */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, - public val host: Host, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt deleted file mode 100644 index 539c9bc9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -/** - * A periodic report of the provisioner's metrics. - */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt deleted file mode 100644 index 6c8fc941..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.experiments.capelin.Portfolio - -/** - * A periodic report of the host machine metrics. - */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt deleted file mode 100644 index 7631f55f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.api.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index c8fe1cb2..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index 946410eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 24d8f768..abd8efeb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.util.* @@ -80,7 +82,6 @@ class CapelinIntegrationTest { ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> @@ -98,7 +99,7 @@ class CapelinIntegrationTest { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -111,15 +112,21 @@ class CapelinIntegrationTest { failureDomain?.cancel() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, + { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, + { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -145,7 +152,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -156,8 +163,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -189,7 +202,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -200,8 +213,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -239,7 +258,7 @@ class CapelinIntegrationTest { chan ) - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -252,8 +271,14 @@ class CapelinIntegrationTest { failureDomain.cancel() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -283,34 +308,19 @@ class CapelinIntegrationTest { return ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentMonitor { + class TestExperimentReporter : ComputeMonitor { var totalWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L var totalInterferedWork = 0L var totalPowerDraw = 0.0 - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host, - ) { - this.totalWork += totalWork.toLong() - totalGrantedWork += grantedWork.toLong() - totalOvercommittedWork += overcommittedWork.toLong() - totalInterferedWork += interferedWork.toLong() - totalPowerDraw += powerDraw + override fun record(data: HostData) { + this.totalWork += data.totalWork.toLong() + totalGrantedWork += data.grantedWork.toLong() + totalOvercommittedWork += data.overcommittedWork.toLong() + totalInterferedWork += data.interferedWork.toLong() + totalPowerDraw += data.powerDraw } - - override fun close() {} } } diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index 7d34d098..40ac2967 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.kotlin.logging) implementation(libs.config) diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index e64e20a2..02aaab3c 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.time.Clock import java.util.* @@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { ) val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) + val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { } } - val monitorResults = collectMetrics(meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) logger.debug { - "Finish SUBMIT=${monitorResults.submittedVms} " + - "FAIL=${monitorResults.unscheduledVms} " + - "QUEUE=${monitorResults.queuedVms} " + - "RUNNING=${monitorResults.runningVms}" + "Finish SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.runningInstanceCount}" } } |
