From 144d9d0c118097900c086b7fb8b1cf22a788592b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 13 Sep 2021 12:22:32 +0200 Subject: build(telemetry): Update to OpenTelemetry 1.6.0 This change updates the opentelemetry-java library to version 1.6.0. --- gradle/libs.versions.toml | 6 +++--- .../org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../org/opendc/telemetry/compute/ComputeMetricExporter.kt | 11 ++++++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ddede2e8..3f0e180b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,9 +13,9 @@ kotlinx-coroutines = "1.5.1" ktor = "1.6.3" log4j = "2.14.1" mockk = "1.12.0" -opentelemetry-main = "1.5.0" -opentelemetry-metrics = "1.5.0-alpha" -opentelemetry-semconv = "1.5.0-alpha" +opentelemetry-main = "1.6.0" +opentelemetry-metrics = "1.6.0-alpha" +opentelemetry-semconv = "1.6.0-alpha" parquet = "1.12.0" progressbar = "0.9.0" sentry = "5.1.2" diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 44cf92a8..cf88535d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -110,7 +110,7 @@ class CapelinIntegrationTest { { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 95e7ff9e..57d43c60 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -59,7 +59,7 @@ public class ComputeMetricExporter( when (metric.name) { "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v } + "power.usage" -> mapDoubleHistogram(metric, hostMetrics) { m, v -> m.powerDraw = v } "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } @@ -105,6 +105,15 @@ public class ComputeMetricExporter( } } + private fun mapDoubleHistogram(data: MetricData, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { + val points = data.doubleHistogramData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } + block(hostMetric, point.sum / point.count) + } + } + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HBuffer, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { -- cgit v1.2.3 From 7960791430b0536df4704493c01d32e38f37f022 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 12:52:17 +0200 Subject: refactor(experiments): Remove energy experiments shell This change removes the energy experiments. The experiments only provided a setup for the original experiments and is not able to reproduce the results without further worker. --- .../opendc-experiments-energy21/.gitignore | 3 - .../opendc-experiments-energy21/build.gradle.kts | 47 ----- .../experiments/energy21/EnergyExperiment.kt | 208 --------------------- .../src/main/resources/application.conf | 14 -- settings.gradle.kts | 1 - 5 files changed, 273 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-energy21/.gitignore delete mode 100644 opendc-experiments/opendc-experiments-energy21/build.gradle.kts delete mode 100644 opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt delete mode 100644 opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf diff --git a/opendc-experiments/opendc-experiments-energy21/.gitignore b/opendc-experiments/opendc-experiments-energy21/.gitignore deleted file mode 100644 index 55da79f8..00000000 --- a/opendc-experiments/opendc-experiments-energy21/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -input/ -output/ -.ipynb_checkpoints diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts deleted file mode 100644 index cc58e5f1..00000000 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiments for the OpenDC Energy work" - -/* Build configuration */ -plugins { - `experiment-conventions` - `testing-conventions` -} - -dependencies { - api(platform(projects.opendcPlatform)) - api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcExperiments.opendcExperimentsCapelin) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.kotlin.logging) - implementation(libs.config) - - implementation(libs.parquet) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } -} diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt deleted file mode 100644 index d9194969..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.energy21 - -import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import mu.KotlinLogging -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor -import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.* -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor -import java.io.File -import java.time.Clock -import java.util.* - -/** - * Experiments for the OpenDC project on Energy modeling. - */ -public class EnergyExperiment : Experiment("Energy Modeling 2021") { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The configuration to use. - */ - private val config = ConfigFactory.load().getConfig("opendc.experiments.energy21") - - /** - * The traces to test. - */ - private val trace by anyOf("solvinity") - - /** - * The power models to test. - */ - private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION) - - override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(), - subsetSize = Int.MAX_VALUE - ) - - val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) - - withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - } - - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) - logger.debug { - "Finish SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" - } - } - - /** - * Construct the environment for a simulated compute service.. - */ - public suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - scheduler: ComputeScheduler, - block: suspend CoroutineScope.(ComputeService) -> Unit - ): Unit = coroutineScope { - val model = createMachineModel() - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = List(64) { id -> - SimHost( - UUID(0, id.toLong()), - "node-$id", - model, - emptyMap(), - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - PerformanceScalingGovernor(), - powerModel.driver - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } - } - - /** - * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html - */ - private fun createMachineModel(): MachineModel { - val node = ProcessingNode("AMD", "am64", "EPYC 7742", 64) - val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } - val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } - - return MachineModel(cpus, memory) - } - - /** - * The power models to test. - */ - public enum class PowerModelType { - CUBIC { - override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4)) - }, - - LINEAR { - override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4)) - }, - - SQRT { - override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4)) - }, - - SQUARE { - override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4)) - }, - - INTERPOLATION { - override val driver: PowerDriver = SimplePowerDriver( - InterpolationPowerModel( - listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0) - ) - ) - }, - - MSE { - override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4)) - }, - - ASYMPTOTIC { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false)) - }, - - ASYMPTOTIC_DVFS { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true)) - }; - - public abstract val driver: PowerDriver - } -} diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf b/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf deleted file mode 100644 index 263da0fe..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf +++ /dev/null @@ -1,14 +0,0 @@ -# Default configuration for the energy experiments -opendc.experiments.energy21 { - # Path to the directory containing the input traces - trace-path = input/traces - - # Path to the output directory to write the results to - output-path = output -} - -opendc.experiments.capelin { - env-path = input/environments/ - trace-path = input/traces/ - output-path = output -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 60e67e2b..933febe0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -31,7 +31,6 @@ include(":opendc-faas:opendc-faas-api") include(":opendc-faas:opendc-faas-service") include(":opendc-faas:opendc-faas-simulator") include(":opendc-experiments:opendc-experiments-capelin") -include(":opendc-experiments:opendc-experiments-energy21") include(":opendc-experiments:opendc-experiments-serverless20") include(":opendc-experiments:opendc-experiments-tf20") include(":opendc-web:opendc-web-api") -- cgit v1.2.3 From eef8ea3ab40a4e4a12ba96f839c35c5804884bc1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 13 Sep 2021 12:25:51 +0200 Subject: refactor(capelin): Improve ParquetDataWriter implementation This change improves the ParquetDataWriter class to support more complex use-cases. It now allows subclasses to modify the writer options. In addition to this, a subclass for writing server data is added. --- .../capelin/export/parquet/ParquetDataWriter.kt | 125 ++++++++++++--------- .../capelin/export/parquet/ParquetExportMonitor.kt | 14 ++- .../export/parquet/ParquetHostDataWriter.kt | 74 +++++++----- .../export/parquet/ParquetServerDataWriter.kt | 73 ++++++++++++ .../export/parquet/ParquetServiceDataWriter.kt | 46 ++++---- 5 files changed, 225 insertions(+), 107 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index c5cb80e2..5684bde9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -25,11 +25,13 @@ package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.example.Paper.schema import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,50 +40,94 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -public open class ParquetDataWriter( - private val path: File, +abstract class ParquetDataWriter( + path: File, private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { + bufferSize: Int = 4096 +) : AutoCloseable { /** * The logging instance to use. */ private val logger = KotlinLogging.logger {} /** - * The writer to write the Parquet file. + * The queue of commands to process. */ - private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) /** - * The queue of commands to process. + * An exception to be propagated to the actual writer. */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + private var exception: Throwable? = null /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { run() } + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) /** * Write the specified metrics to the database. */ - public fun write(event: T) { - queue.put(Action.Write(event)) + fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) } /** * Signal the writer to stop. */ - public override fun close() { - queue.put(Action.Stop) + override fun close() { + writerThread.interrupt() writerThread.join() } @@ -90,38 +136,11 @@ public open class ParquetDataWriter( } /** - * Start the writer thread. + * Process the specified [data] to be written to the Parquet file. */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.data as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write(val data: T) : Action() + private fun process(writer: ParquetWriter, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 79b84e9d..b057e932 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val hostWriter = ParquetHostDataWriter( File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + private val serviceWriter = ParquetServiceDataWriter( File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + override fun record(data: ServerData) { + serverWriter.write(data) + } + override fun record(data: HostData) { hostWriter.write(data) } @@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int override fun close() { hostWriter.close() serviceWriter.close() + serverWriter.close() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 8912c12e..7062a275 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,10 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -32,42 +36,52 @@ import java.io.File * A Parquet event writer for [HostData]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, schema, convert, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { - override fun toString(): String = "host-writer" + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } - public companion object { - private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> - record.put("host_id", data.host.name) - record.put("state", data.host.state.name) - record.put("timestamp", data.timestamp) - record.put("total_work", data.totalWork) - record.put("granted_work", data.grantedWork) - record.put("overcommitted_work", data.overcommittedWork) - record.put("interfered_work", data.interferedWork) - record.put("cpu_usage", data.cpuUsage) - record.put("cpu_demand", data.cpuDemand) - record.put("power_draw", data.powerDraw) - record.put("instance_count", data.instanceCount) - record.put("cores", data.host.model.cpuCount) - } + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp + builder["host_id"] = data.host.name + builder["powered_on"] = data.host.state == HostState.UP + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["total_work"] = data.totalWork + builder["granted_work"] = data.grantedWork + builder["overcommitted_work"] = data.overcommittedWork + builder["interfered_work"] = data.interferedWork + builder["cpu_usage"] = data.cpuUsage + builder["cpu_demand"] = data.cpuDemand + builder["power_draw"] = data.powerDraw + builder["num_instances"] = data.instanceCount + builder["num_cpus"] = data.host.model.cpuCount + } + + override fun toString(): String = "host-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("requested_work").type().longType().noDefault() - .name("granted_work").type().longType().noDefault() - .name("overcommitted_work").type().longType().noDefault() - .name("interfered_work").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("instance_count").type().intType().noDefault() - .name("cores").type().intType().noDefault() + .requiredLong("timestamp") + .requiredString("host_id") + .requiredBoolean("powered_on") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredDouble("total_work") + .requiredDouble("granted_work") + .requiredDouble("overcommitted_work") + .requiredDouble("interfered_work") + .requiredDouble("cpu_usage") + .requiredDouble("cpu_demand") + .requiredDouble("power_draw") + .requiredInt("num_instances") + .requiredInt("num_cpus") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..9904adde --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("state", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp + builder["server_id"] = data.server.uid.toString() + builder["state"] = data.server.state + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["num_vcpus"] = data.server.flavor.cpuCount + builder["mem_capacity"] = data.server.flavor.memorySize + } + + override fun toString(): String = "server-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .requiredLong("timestamp") + .requiredString("server_id") + .requiredString("state") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredInt("num_vcpus") + .requiredLong("mem_capacity") + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 36d630f3..e1428fe7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData import java.io.File @@ -32,34 +32,34 @@ import java.io.File * A Parquet event writer for [ServiceData]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, schema, convert, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { - override fun toString(): String = "service-writer" + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp + builder["host_total_count"] = data.hostCount + builder["host_available_count"] = data.activeHostCount + builder["instance_total_count"] = data.instanceCount + builder["instance_active_count"] = data.runningInstanceCount + builder["instance_inactive_count"] = data.finishedInstanceCount + builder["instance_waiting_count"] = data.queuedInstanceCount + builder["instance_failed_count"] = data.failedInstanceCount + } - public companion object { - private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> - record.put("timestamp", data.timestamp) - record.put("host_total_count", data.hostCount) - record.put("host_available_count", data.activeHostCount) - record.put("instance_total_count", data.instanceCount) - record.put("instance_active_count", data.runningInstanceCount) - record.put("instance_inactive_count", data.finishedInstanceCount) - record.put("instance_waiting_count", data.queuedInstanceCount) - record.put("instance_failed_count", data.failedInstanceCount) - } + override fun toString(): String = "service-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("instance_total_count").type().intType().noDefault() - .name("instance_active_count").type().intType().noDefault() - .name("instance_inactive_count").type().intType().noDefault() - .name("instance_waiting_count").type().intType().noDefault() - .name("instance_failed_count").type().intType().noDefault() + .requiredLong("timestamp") + .requiredInt("host_total_count") + .requiredInt("host_available_count") + .requiredInt("instance_total_count") + .requiredInt("instance_active_count") + .requiredInt("instance_inactive_count") + .requiredInt("instance_waiting_count") + .requiredInt("instance_failed_count") .endRecord() } } -- cgit v1.2.3 From 5f0b6b372487d79594cf59010822e160f351e0be Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 13 Sep 2021 14:48:02 +0200 Subject: refactor(telemetry): Simplify CoroutineMetricReader This change simplifies the CoroutineMetricReader implementation by removing the seperation of reader and exporter jobs. --- .../org/opendc/compute/simulator/SimHostTest.kt | 5 ++- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 3 +- .../sdk/metrics/export/CoroutineMetricReader.kt | 52 +++++++--------------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 31215e9a..9fa8af34 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -46,6 +46,7 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock +import java.time.Duration import java.util.* import kotlin.coroutines.resume @@ -149,7 +150,7 @@ internal class SimHostTest { override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000L + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -261,7 +262,7 @@ internal class SimHostTest { override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() }, - exportInterval = duration * 1000L + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index d3d983b9..01df0e69 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -33,6 +33,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.ServiceData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.time.Clock +import java.time.Duration /** * Attach the specified monitor to the OpenDC Compute service. @@ -42,7 +43,7 @@ public suspend fun withMonitor( clock: Clock, metricProducer: MetricProducer, monitor: ComputeMonitor, - exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */ + exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */ block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { // Monitor host events diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..8f19ab81 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(1) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + while (isActive) { - delay(exportInterval) + delay(intervalMs) val metrics = mutableListOf() for (producer in producers) { metrics.addAll(producer.collectAllMetrics()) } - chan.send(Collections.unmodifiableList(metrics)) - } - } - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine { cont -> - try { - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.trace { "Exporter failed" } - } - cont.resume(Unit) + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } } } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } -- cgit v1.2.3 From 3ca64e0110adab65526a0ccfd5b252e9f047ab10 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 14:41:05 +0200 Subject: refactor(telemetry): Create separate MeterProvider per service/host This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario. --- .../org/opendc/compute/service/ComputeService.kt | 10 +- .../compute/service/internal/ComputeServiceImpl.kt | 26 +- .../compute/service/internal/InternalServer.kt | 18 ++ .../opendc/compute/service/ComputeServiceTest.kt | 3 +- .../opendc/compute/service/InternalServerTest.kt | 81 +++--- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 43 +-- .../org/opendc/compute/simulator/SimHostTest.kt | 54 ++-- .../experiments/capelin/ExperimentHelpers.kt | 256 ------------------ .../org/opendc/experiments/capelin/Portfolio.kt | 67 ++--- .../export/parquet/ParquetHostDataWriter.kt | 7 +- .../export/parquet/ParquetServerDataWriter.kt | 8 +- .../experiments/capelin/util/ComputeSchedulers.kt | 86 ++++++ .../capelin/util/ComputeServiceSimulator.kt | 222 ++++++++++++++++ .../experiments/capelin/util/FailureModel.kt | 38 +++ .../experiments/capelin/util/FailureModels.kt | 97 +++++++ .../experiments/capelin/CapelinIntegrationTest.kt | 190 +++++++------ .../experiments/serverless/ServerlessExperiment.kt | 3 +- opendc-faas/opendc-faas-service/build.gradle.kts | 1 + .../kotlin/org/opendc/faas/service/FaaSService.kt | 7 +- .../org/opendc/faas/service/FunctionObject.kt | 26 +- .../autoscaler/FunctionTerminationPolicyFixed.kt | 5 +- .../faas/service/internal/FaaSServiceImpl.kt | 8 +- .../org/opendc/faas/service/FaaSServiceTest.kt | 30 +-- .../opendc/faas/simulator/SimFaaSServiceTest.kt | 6 +- .../opendc-telemetry-compute/build.gradle.kts | 1 - .../telemetry/compute/ComputeMetricExporter.kt | 295 +++++++++++++++------ .../org/opendc/telemetry/compute/ComputeMonitor.kt | 14 - .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 44 --- .../org/opendc/telemetry/compute/HostAttributes.kt | 51 ++++ .../org/opendc/telemetry/compute/table/HostData.kt | 4 +- .../org/opendc/telemetry/compute/table/HostInfo.kt | 28 ++ .../opendc/telemetry/compute/table/ServerData.kt | 5 +- .../opendc/telemetry/compute/table/ServerInfo.kt | 37 +++ .../src/main/kotlin/org/opendc/web/runner/Main.kt | 72 +++-- .../org/opendc/web/runner/WebComputeMonitor.kt | 40 ++- .../org/opendc/workflow/service/WorkflowService.kt | 8 +- .../service/internal/WorkflowServiceImpl.kt | 8 +- .../opendc/workflow/service/WorkflowServiceTest.kt | 8 +- 38 files changed, 1148 insertions(+), 759 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f1c055d4..824becf4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,9 +22,8 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -35,6 +34,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -42,15 +42,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -62,6 +65,11 @@ internal class ComputeServiceImpl( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + /** * The [Random] instance used to generate unique identifiers for the objects. */ @@ -365,10 +373,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -414,7 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) + _schedulerDuration.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -49,6 +52,21 @@ internal class InternalServer( */ private val watchers = mutableListOf() + /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + /** * The [Host] that has been assigned to host the server. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index d036ec00..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -61,8 +61,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 28fd8217..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,8 +99,8 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,8 +158,8 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk() val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk(relaxUnitFun = true) @@ -220,8 +221,8 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk() - val image = mockk() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index a1cc3390..be6ef11e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -25,6 +25,7 @@ package org.opendc.compute.simulator import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging @@ -59,7 +60,7 @@ public class SimHost( override val meta: Map, context: CoroutineContext, interpreter: SimResourceInterpreter, - private val meter: Meter, + meterProvider: MeterProvider, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -81,6 +82,11 @@ public class SimHost( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] to track metrics of the simulated host. + */ + private val meter = meterProvider.get("org.opendc.compute.simulator") + /** * The event listeners registered with this host. */ @@ -142,10 +148,9 @@ public class SimHost( * The total number of guests. */ private val _guests = meter.upDownCounterBuilder("guests.total") - .setDescription("Number of guests") + .setDescription("Total number of guests") .setUnit("1") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The number of active guests on the host. @@ -154,7 +159,6 @@ public class SimHost( .setDescription("Number of active guests") .setUnit("1") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The CPU demand of the host. @@ -163,7 +167,6 @@ public class SimHost( .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") .setUnit("MHz") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The CPU usage of the host. @@ -172,7 +175,6 @@ public class SimHost( .setDescription("The amount of CPU resources used by the host") .setUnit("MHz") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The power usage of the host. @@ -181,7 +183,6 @@ public class SimHost( .setDescription("The amount of power used by the CPU") .setUnit("W") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The total amount of work supplied to the CPU. @@ -191,7 +192,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The work performed by the CPU. @@ -201,7 +201,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount not performed by the CPU due to overcommitment. @@ -211,7 +210,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount of work not performed by the CPU due to interference. @@ -221,7 +219,6 @@ public class SimHost( .setUnit("1") .ofDoubles() .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The amount of time in the system. @@ -230,7 +227,6 @@ public class SimHost( .setDescription("The amount of time in the system") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The uptime of the host. @@ -239,7 +235,6 @@ public class SimHost( .setDescription("The uptime of the host") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The downtime of the host. @@ -248,7 +243,6 @@ public class SimHost( .setDescription("The downtime of the host") .setUnit("ms") .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) init { // Launch hypervisor onto machine @@ -390,6 +384,21 @@ public class SimHost( private inner class Guest(val server: Server, val machine: SimMachine) { var state: ServerState = ServerState.TERMINATED + /** + * The attributes of the guest. + */ + val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, server.name) + .put(ResourceAttributes.HOST_ID, server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) + .build() + /** * The amount of time in the system. */ @@ -397,7 +406,7 @@ public class SimHost( .setDescription("The amount of time in the system") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) /** * The uptime of the guest. @@ -406,7 +415,7 @@ public class SimHost( .setDescription("The uptime of the guest") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) /** * The time the guest is in an error state. @@ -415,7 +424,7 @@ public class SimHost( .setDescription("The time the guest is in an error state") .setUnit("ms") .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + .bind(attributes) suspend fun start() { when (state) { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 9fa8af34..318b5a5d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -49,6 +49,7 @@ import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration import java.util.* import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * Basic test-suite for the hypervisor. @@ -72,7 +73,7 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L + var totalWork = 0L var grantedWork = 0L var overcommittedWork = 0L @@ -89,7 +90,7 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -134,15 +135,10 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.overcommit"]?.let { - overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } + + totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() + grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() + overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong() return CompletableResultCode.ofSuccess() } @@ -176,7 +172,7 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4147200, requestedWork, "Requested work does not match") }, + { assertEquals(4147200, totalWork, "Requested work does not match") }, { assertEquals(2107200, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1500001, clock.millis()) } @@ -188,7 +184,7 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var requestedWork = 0L + var totalWork = 0L var grantedWork = 0L var totalTime = 0L var downTime = 0L @@ -208,7 +204,7 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -237,24 +233,14 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["host.time.total"]?.let { - totalTime = it.longSumData.points.first().value - } - metricsByName["host.time.down"]?.let { - downTime = it.longSumData.points.first().value - } - metricsByName["guest.time.total"]?.let { - guestTotalTime = it.longSumData.points.first().value - } - metricsByName["guest.time.error"]?.let { - guestDownTime = it.longSumData.points.first().value - } + + totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() + grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() + totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value + downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value + guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value + guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value + return CompletableResultCode.ofSuccess() } @@ -290,8 +276,8 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2226039, requestedWork, "Total time does not match") }, - { assertEquals(1086039, grantedWork, "Down time does not match") }, + { assertEquals(2226040, totalWork, "Total time does not match") }, + { assertEquals(1086040, grantedWork, "Down time does not match") }, { assertEquals(1200001, totalTime, "Total time does not match") }, { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, { assertEquals(5000, downTime, "Down time does not match") }, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..f7f9336e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() - monitor.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + "SUBMIT=${monitorResults.instanceCount} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 7062a275..fa00fc35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: HostData) { builder["timestamp"] = data.timestamp - builder["host_id"] = data.host.name - builder["powered_on"] = data.host.state == HostState.UP + builder["host_id"] = data.host.id + builder["powered_on"] = true builder["uptime"] = data.uptime builder["downtime"] = data.downtime builder["total_work"] = data.totalWork @@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["cpu_demand"] = data.cpuDemand builder["power_draw"] = data.powerDraw builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.model.cpuCount + builder["num_cpus"] = data.host.cpuCount } override fun toString(): String = "host-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index 9904adde..bb2db4b7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServerData) { builder["timestamp"] = data.timestamp - builder["server_id"] = data.server.uid.toString() - builder["state"] = data.server.state + builder["server_id"] = data.server + // builder["state"] = data.server.state builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["num_vcpus"] = data.server.flavor.cpuCount - builder["mem_capacity"] = data.server.flavor.memorySize + // builder["num_vcpus"] = data.server.flavor.cpuCount + // builder["mem_capacity"] = data.server.flavor.memorySize } override fun toString(): String = "server-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List + get() = _metricProducers + private val _metricProducers = mutableListOf() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index cf88535d..f4cf3e5e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* +import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -59,12 +61,21 @@ class CapelinIntegrationTest { */ private lateinit var monitor: TestExperimentReporter + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,26 +83,26 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -106,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } @@ -120,27 +131,26 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -151,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -164,10 +174,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,20 +183,24 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -201,10 +211,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -214,39 +224,27 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -257,9 +255,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -291,10 +289,10 @@ class CapelinIntegrationTest { var totalPowerDraw = 0.0 override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() + this.totalWork += data.totalWork.roundToLong() + totalGrantedWork += data.grantedWork.roundToLong() + totalOvercommittedWork += data.overcommittedWork.roundToLong() + totalInterferedWork += data.interferedWork.roundToLong() totalPowerDraw += data.powerDraw } } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 650416f5..3312d6c0 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.time.Duration import java.util.* import kotlin.math.max @@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") { val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000)) + FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) val client = service.newClient() coroutineScope { diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 63bed8bc..6f4fcc9b 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 7e716a34..1d5331cb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer @@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meter The meter to report metrics to. + * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. @@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, ): FaaSService { - return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index a1cb1dbf..54df2b59 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter import io.opentelemetry.api.metrics.BoundLongHistogram import io.opentelemetry.api.metrics.BoundLongUpDownCounter import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.faas.service.deployer.FunctionInstance import java.util.* @@ -43,9 +44,14 @@ public class FunctionObject( meta: Map ) : AutoCloseable { /** - * The function identifier attached to the metrics. + * The attributes of this function. */ - private val functionId = AttributeKey.stringKey("function") + public val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.FAAS_ID, uid.toString()) + .put(ResourceAttributes.FAAS_NAME, name) + .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) + .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) + .build() /** * The total amount of function invocations received by the function. @@ -54,7 +60,7 @@ public class FunctionObject( .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that could be handled directly. @@ -63,7 +69,7 @@ public class FunctionObject( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that were delayed due to function deployment. @@ -72,7 +78,7 @@ public class FunctionObject( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that failed. @@ -81,7 +87,7 @@ public class FunctionObject( .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of instances for this function. @@ -90,7 +96,7 @@ public class FunctionObject( .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of idle instances for this function. @@ -99,7 +105,7 @@ public class FunctionObject( .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function waited. @@ -109,7 +115,7 @@ public class FunctionObject( .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function was running. @@ -119,7 +125,7 @@ public class FunctionObject( .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 1e224ed1..63dbadc7 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext public class FunctionTerminationPolicyFixed( context: CoroutineContext, clock: Clock, - public val timeout: Long + public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout) { instance.close() } + scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ccf9a5d9..3b560cd3 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + private val meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -69,6 +70,11 @@ internal class FaaSServiceImpl( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] that collects the metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.faas.service") + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 6b99684a..1612e10b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,8 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -69,8 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -81,8 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -95,8 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -109,8 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -123,8 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -135,8 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -150,8 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -163,9 +154,8 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val deployer = mockk() - val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 64f2551b..0dc9ba87 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import java.time.Duration /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -64,14 +65,13 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { override suspend fun invoke() {} }) val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } val service = FaaSService( - coroutineContext, clock, meter, deployer, RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000) + coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) val client = service.newClient() diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 6a3de9bc..cd8cb57a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -31,7 +31,6 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcCompute.opendcComputeSimulator) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 57d43c60..408d1325 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -22,137 +22,260 @@ package org.opendc.telemetry.compute +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.driver.Host import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerInfo import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter( - private val clock: Clock, - private val hosts: Map, - private val monitor: ComputeMonitor -) : MetricExporter { - +public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { override fun export(metrics: Collection): CompletableResultCode { return try { - reportHostMetrics(metrics) reportServiceMetrics(metrics) + reportHostMetrics(metrics) + reportServerMetrics(metrics) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() } } - private var lastHostMetrics: Map = emptyMap() - private val hostMetricsSingleton = HBuffer() + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + + private fun reportServiceMetrics(metrics: Collection) { + monitor.record(extractServiceMetrics(clock.millis(), metrics)) + } + + private val hosts = mutableMapOf() + private val servers = mutableMapOf() private fun reportHostMetrics(metrics: Collection) { - val hostMetrics = mutableMapOf() + val hosts = hosts + val servers = servers + + for (metric in metrics) { + val resource = metric.resource + val hostId = resource.attributes[HOST_ID] ?: continue + val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) } + agg.accept(metric) + } + + val monitor = monitor + val now = clock.millis() + for ((_, server) in servers) { + server.record(monitor, now) + } + } + + private fun reportServerMetrics(metrics: Collection) { + val hosts = hosts for (metric in metrics) { + val resource = metric.resource + val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host } + when (metric.name) { - "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } - "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleHistogram(metric, hostMetrics) { m, v -> m.powerDraw = v } - "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } - "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } - "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } - "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } - "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } - "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } - "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } + "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point -> + agg.schedulingLatency = point.sum / point.count + } + "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point -> + agg.uptime = point.value + } + "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point -> + agg.downtime = point.value + } } } - for ((id, hostMetric) in hostMetrics) { - val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts[id] ?: continue + val monitor = monitor + val now = clock.millis() + for ((_, host) in hosts) { + host.record(monitor, now) + } + } + + /** + * Helper function to map a metric by the server. + */ + private inline fun

mapByServer(points: Collection

, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) { + for (point in points) { + val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) } + + if (host != null) { + agg.host = host + } + + block(agg, point) + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME]!!, + resource.attributes[HOST_ARCH]!!, + resource.attributes[HOST_NCPUS]!!.toInt(), + resource.attributes[HOST_MEM_CAPACITY]!!, + ) + + private var totalWork: Double = 0.0 + private var previousTotalWork = 0.0 + private var grantedWork: Double = 0.0 + private var previousGrantedWork = 0.0 + private var overcommittedWork: Double = 0.0 + private var previousOvercommittedWork = 0.0 + private var interferedWork: Double = 0.0 + private var previousInterferedWork = 0.0 + private var cpuUsage: Double = 0.0 + private var cpuDemand: Double = 0.0 + private var instanceCount: Int = 0 + private var powerDraw: Double = 0.0 + private var uptime: Long = 0 + private var previousUptime = 0L + private var downtime: Long = 0 + private var previousDowntime = 0L + fun record(monitor: ComputeMonitor, now: Long) { monitor.record( HostData( - clock.millis(), + now, host, - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.instanceCount, - hostMetric.powerDraw, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, + totalWork - previousTotalWork, + grantedWork - previousGrantedWork, + overcommittedWork - previousOvercommittedWork, + interferedWork - previousInterferedWork, + cpuUsage, + cpuDemand, + instanceCount, + powerDraw, + uptime - previousUptime, + downtime - previousDowntime, ) ) - } - - lastHostMetrics = hostMetrics - } - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { - val points = data.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) + previousTotalWork = totalWork + previousGrantedWork = grantedWork + previousOvercommittedWork = overcommittedWork + previousInterferedWork = interferedWork + previousUptime = uptime + previousDowntime = downtime + reset() } - } - private fun mapDoubleHistogram(data: MetricData, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { - val points = data.doubleHistogramData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.sum / point.count) + /** + * Accept the [MetricData] for this host. + */ + fun accept(data: MetricData) { + when (data.name) { + "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value + "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value + "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value + "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value + "power.usage" -> powerDraw = acceptHistogram(data) + "cpu.usage" -> cpuUsage = acceptHistogram(data) + "cpu.demand" -> cpuDemand = acceptHistogram(data) + "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt() + "host.time.up" -> uptime = data.longSumData.points.first().value + "host.time.down" -> downtime = data.longSumData.points.first().value + } } - } - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HBuffer, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) + private fun acceptHistogram(data: MetricData): Double { + return when (data.type) { + MetricDataType.HISTOGRAM -> { + val point = data.doubleHistogramData.points.first() + point.sum / point.count + } + MetricDataType.SUMMARY -> { + val point = data.doubleSummaryData.points.first() + point.sum / point.count + } + else -> error("Invalid metric type") + } } - } - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) + private fun reset() { + totalWork = 0.0 + grantedWork = 0.0 + overcommittedWork = 0.0 + interferedWork = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + instanceCount = 0 + powerDraw = 0.0 + uptime = 0L + downtime = 0L } } /** - * A buffer for host metrics before they are reported. + * An aggregator for server metrics before they are reported. */ - private class HBuffer { - var totalWork: Double = 0.0 - var grantedWork: Double = 0.0 - var overcommittedWork: Double = 0.0 - var interferedWork: Double = 0.0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var instanceCount: Int = 0 - var powerDraw: Double = 0.0 - var uptime: Long = 0 - var downtime: Long = 0 - } + private class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) - private fun reportServiceMetrics(metrics: Collection) { - monitor.record(extractServiceMetrics(clock.millis(), metrics)) - } + /** + * The [HostInfo] of the host on which the server is hosted. + */ + var host: HostInfo? = null - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var schedulingLatency = 0.0 - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + fun record(monitor: ComputeMonitor, now: Long) { + monitor.record( + ServerData( + now, + server, + null, + uptime - previousUptime, + downtime - previousDowntime, + ) + ) + + previousUptime = uptime + previousDowntime = downtime + reset() + } + + private fun reset() { + host = null + uptime = 0L + downtime = 0L + } + } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index ec303b37..d51bcab4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,10 +22,6 @@ package org.opendc.telemetry.compute -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData @@ -34,16 +30,6 @@ import org.opendc.telemetry.compute.table.ServiceData * A monitor that tracks the metrics and events of the OpenDC Compute service. */ public interface ComputeMonitor { - /** - * This method is invoked when the state of a [Server] changes. - */ - public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a [Host] changes. - */ - public fun onStateChange(time: Long, host: Host, newState: HostState) {} - /** * Record the specified [data]. */ diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 01df0e69..1f309f1b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,51 +24,7 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import java.time.Clock -import java.time.Duration - -/** - * Attach the specified monitor to the OpenDC Compute service. - */ -public suspend fun withMonitor( - scheduler: ComputeService, - clock: Clock, - metricProducer: MetricProducer, - monitor: ComputeMonitor, - exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */ - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.onStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.onStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor), - exportInterval - ) - - try { - block(this) - } finally { - reader.close() - } -} /** * Collect the metrics of the compute service. diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt new file mode 100644 index 00000000..7dca6186 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("HostAttributes") +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey + +/** + * The identifier of the node hosting virtual machines. + */ +public val HOST_ID: AttributeKey = AttributeKey.stringKey("node.id") + +/** + * The name of the node hosting virtual machines. + */ +public val HOST_NAME: AttributeKey = AttributeKey.stringKey("node.name") + +/** + * The CPU architecture of the host node. + */ +public val HOST_ARCH: AttributeKey = AttributeKey.stringKey("node.arch") + +/** + * The number of CPUs in the host node. + */ +public val HOST_NCPUS: AttributeKey = AttributeKey.longKey("node.num_cpus") + +/** + * The amount of memory installed in the host node in MiB. + */ +public val HOST_MEM_CAPACITY: AttributeKey = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index 8e6c34d0..e3ecda3d 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -22,14 +22,12 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.service.driver.Host - /** * A trace entry for a particular host. */ public data class HostData( public val timestamp: Long, - public val host: Host, + public val host: HostInfo, public val totalWork: Double, public val grantedWork: Double, public val overcommittedWork: Double, diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt new file mode 100644 index 00000000..d9a5906b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 2a9fa8a6..7fde86d9 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -22,14 +22,13 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.api.Server - /** * A trace entry for a particular server. */ public data class ServerData( public val timestamp: Long, - public val server: Server, + public val server: ServerInfo, + public val host: HostInfo?, public val uptime: Long, public val downtime: Long, ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt new file mode 100644 index 00000000..b16e5f3d --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b565e90d..b9d5a3f5 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,10 +28,8 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.env.MachineDef @@ -39,6 +37,8 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -46,8 +46,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -55,9 +56,8 @@ import org.opendc.web.client.model.Scenario import org.opendc.web.client.model.Topology import java.io.File import java.net.URI +import java.time.Duration import java.util.* -import kotlin.random.Random -import kotlin.random.asJavaRandom import org.opendc.web.client.model.Portfolio as ClientPortfolio private val logger = KotlinLogging.logger {} @@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -182,63 +182,55 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val seed = repeat val workloadName = scenario.trace.traceId val workloadFraction = scenario.trace.loadSamplingFraction - val seeder = Random(seed) + val seeder = Random(repeat.toLong()) val meterProvider: MeterProvider = SdkMeterProvider .builder() .setClock(clock.toOtelClock()) .build() - val metricProducer = meterProvider as MetricProducer val operational = scenario.operationalPhenomena - val allocationPolicy = createComputeScheduler(operational.schedulerName, seeder) + val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) val trace = ParquetTraceReader( listOf(traceReader), Workload(workloadName, workloadFraction), - seed + repeat ) - val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0 - - withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> - val faultInjector = if (failureFrequency > 0) { - logger.debug { "ENABLING failures" } - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - failureFrequency, - ) - } else { + val failureModel = + if (operational.failuresEnabled) + grid5000(Duration.ofDays(7), repeat) + else null - } - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + interferenceModel.takeIf { operational.performanceInterferenceEnabled } + ) - processTrace( - clock, - trace, - scheduler, - monitor - ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - faultInjector?.close() - } + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index c8e58dde..4b813310 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -22,27 +22,19 @@ package org.opendc.web.runner -import mu.KotlinLogging -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServiceData import kotlin.math.max +import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -public class WebComputeMonitor : ComputeMonitor { - private val logger = KotlinLogging.logger {} - - override fun onStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - +class WebComputeMonitor : ComputeMonitor { override fun record(data: HostData) { - val duration = 5 * 60 * 1000L - val slices = duration / SLICE_LENGTH + val duration = data.uptime + val slices = data.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( hostAggregateMetrics.totalWork + data.totalWork, @@ -50,14 +42,14 @@ public class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices ) - hostMetrics.compute(data.host) { _, prev -> + hostMetrics.compute(data.host.id) { _, prev -> HostMetrics( - (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), data.instanceCount + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) @@ -65,7 +57,7 @@ public class WebComputeMonitor : ComputeMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() + private val hostMetrics: MutableMap = mutableMapOf() private val SLICE_LENGTH: Long = 5 * 60 * 1000 data class AggregateHostMetrics( @@ -74,8 +66,8 @@ public class WebComputeMonitor : ComputeMonitor { val totalOvercommittedWork: Double = 0.0, val totalInterferedWork: Double = 0.0, val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, ) data class HostMetrics( @@ -97,7 +89,7 @@ public class WebComputeMonitor : ComputeMonitor { ) } - public data class AggregateServiceMetrics( + data class AggregateServiceMetrics( val vmTotalCount: Int = 0, val vmWaitingCount: Int = 0, val vmActiveCount: Int = 0, @@ -105,7 +97,7 @@ public class WebComputeMonitor : ComputeMonitor { val vmFailedCount: Int = 0 ) - public fun getResult(): Result { + fun getResult(): Result { return Result( hostAggregateMetrics.totalWork, hostAggregateMetrics.totalGrantedWork, @@ -116,8 +108,8 @@ public class WebComputeMonitor : ComputeMonitor { hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), serviceMetrics.vmTotalCount, serviceMetrics.vmWaitingCount, serviceMetrics.vmInactiveCount, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index d3358ef1..a0248a93 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. - * @param meter The meter to use. + * @param meterProvider The meter provider to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meter, + meterProvider, compute, mode, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 5329143d..a0fd3fad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.flow.map import mu.KotlinLogging @@ -48,7 +49,7 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -66,6 +67,11 @@ public class WorkflowServiceImpl( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] to collect metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.workflow.service") + /** * The incoming jobs ready to be processed by the scheduler. */ diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 07433d1f..74316437 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.time.Duration import java.util.* /** @@ -79,24 +80,23 @@ internal class WorkflowServiceTest { emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + MeterProvider.noop(), hvProvider, ) } - val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) hosts.forEach { compute.addHost(it) } val scheduler = WorkflowService( coroutineContext, clock, - meterProvider.get("opendc-workflow"), + meterProvider, compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, -- cgit v1.2.3 From 8d899e29dbd757f6df320212d6e0d77ce8216ab9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 15:38:38 +0200 Subject: refactor(telemetry): Standardize compute scheduler metrics This change updates the OpenDC compute service implementation with multiple meters that follow the OpenTelemetry conventions. --- .../compute/service/internal/ComputeServiceImpl.kt | 137 +++++++++------------ .../org/opendc/experiments/capelin/Portfolio.kt | 11 +- .../export/parquet/ParquetServiceDataWriter.kt | 28 ++--- .../experiments/capelin/CapelinIntegrationTest.kt | 52 ++++---- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 59 +++++---- .../opendc/telemetry/compute/table/ServiceData.kt | 14 +-- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 11 +- .../org/opendc/web/runner/WebComputeMonitor.kt | 10 +- 8 files changed, 162 insertions(+), 160 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 824becf4..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,6 +22,8 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* @@ -114,69 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. + * The number of scheduling attempts. */ - private val _submittedServers = meter.counterBuilder("servers.submitted") - .setDescription("Number of start requests") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _runningServers = meter.upDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") - .setUnit("1") - .build() - - /** - * The number of servers that have finished running. - */ - private val _finishedServers = meter.counterBuilder("servers.finished") - .setDescription("Number of servers that finished running") - .setUnit("1") - .build() - - /** - * The number of hosts registered at the compute service. + * The response time of the service. */ - private val _hostCount = meter.upDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of available hosts registered at the compute service. + * The number of servers that are pending. */ - private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() - - /** - * The response time of the service. - */ - private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -189,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -316,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -346,8 +327,7 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -395,7 +375,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -407,10 +387,10 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } server.state = ServerState.TERMINATED continue @@ -423,8 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, server.attributes) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -439,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -463,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -498,16 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.ERROR) { - _runningServers.add(-1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f7f9336e..3ec424f1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -153,11 +153,12 @@ abstract class Portfolio(name: String) : Experiment(name) { val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.activeHostCount}" + "Scheduler " + + "Success=${monitorResults.attemptsSuccess} " + + "Failure=${monitorResults.attemptsFailure} " + + "Error=${monitorResults.attemptsError} " + + "Pending=${monitorResults.serversPending} " + + "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index e1428fe7..29b48878 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -36,13 +36,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServiceData) { builder["timestamp"] = data.timestamp - builder["host_total_count"] = data.hostCount - builder["host_available_count"] = data.activeHostCount - builder["instance_total_count"] = data.instanceCount - builder["instance_active_count"] = data.runningInstanceCount - builder["instance_inactive_count"] = data.finishedInstanceCount - builder["instance_waiting_count"] = data.queuedInstanceCount - builder["instance_failed_count"] = data.failedInstanceCount + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError } override fun toString(): String = "service-writer" @@ -53,13 +53,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .namespace("org.opendc.telemetry.compute") .fields() .requiredLong("timestamp") - .requiredInt("host_total_count") - .requiredInt("host_available_count") - .requiredInt("instance_total_count") - .requiredInt("instance_active_count") - .requiredInt("instance_inactive_count") - .requiredInt("instance_waiting_count") - .requiredInt("instance_failed_count") + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index f4cf3e5e..81405acf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -104,19 +104,20 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -152,11 +153,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -202,11 +204,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -246,11 +249,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 1f309f1b..f3690ee8 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,6 +22,7 @@ package org.opendc.telemetry.compute +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData @@ -37,32 +38,48 @@ public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer * Extract a [ServiceData] object from the specified list of metric data. */ public fun extractServiceMetrics(timestamp: Long, metrics: Collection): ServiceData { - var submittedVms = 0 - var queuedVms = 0 - var unscheduledVms = 0 - var runningVms = 0 - var finishedVms = 0 - var hosts = 0 - var availableHosts = 0 + val resultKey = AttributeKey.stringKey("result") + val stateKey = AttributeKey.stringKey("state") - for (metric in metrics) { - val points = metric.longSumData.points + var hostsUp = 0 + var hostsDown = 0 - if (points.isEmpty()) { - continue - } + var serversPending = 0 + var serversActive = 0 - val value = points.first().value.toInt() + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + + for (metric in metrics) { when (metric.name) { - "servers.submitted" -> submittedVms = value - "servers.waiting" -> queuedVms = value - "servers.unscheduled" -> unscheduledVms = value - "servers.active" -> runningVms = value - "servers.finished" -> finishedVms = value - "hosts.total" -> hosts = value - "hosts.available" -> availableHosts = value + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[stateKey]) { + "up" -> hostsUp = point.value.toInt() + "down" -> hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[stateKey]) { + "pending" -> serversPending = point.value.toInt() + "active" -> serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[resultKey]) { + "success" -> attemptsSuccess = point.value.toInt() + "failure" -> attemptsFailure = point.value.toInt() + "error" -> attemptsError = point.value.toInt() + } + } + } } } - return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms) + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index f6ff5db5..da2ebdf4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -27,11 +27,11 @@ package org.opendc.telemetry.compute.table */ public data class ServiceData( public val timestamp: Long, - public val hostCount: Int, - public val activeHostCount: Int, - public val instanceCount: Int, - public val runningInstanceCount: Int, - public val finishedInstanceCount: Int, - public val queuedInstanceCount: Int, - public val failedInstanceCount: Int + public val hostsUp: Int, + public val hostsDown: Int, + public val serversPending: Int, + public val serversActive: Int, + public val attemptsSuccess: Int, + public val attemptsFailure: Int, + public val attemptsError: Int ) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index b9d5a3f5..960d5ebd 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -226,11 +226,12 @@ class RunnerCli : CliktCommand(name = "runner") { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" } } } catch (cause: Throwable) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index 4b813310..5f2c474b 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -81,11 +81,11 @@ class WebComputeMonitor : ComputeMonitor { override fun record(data: ServiceData) { serviceMetrics = AggregateServiceMetrics( - max(data.instanceCount, serviceMetrics.vmTotalCount), - max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount), - max(data.runningInstanceCount, serviceMetrics.vmActiveCount), - max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount), - max(data.failedInstanceCount, serviceMetrics.vmFailedCount), + max(data.attemptsSuccess, serviceMetrics.vmTotalCount), + max(data.serversPending, serviceMetrics.vmWaitingCount), + max(data.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(data.attemptsFailure, serviceMetrics.vmFailedCount), ) } -- cgit v1.2.3 From 0d8bccc68705d036fbf60f312d9c34ca4392c6b2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 7 Sep 2021 17:30:46 +0200 Subject: refactor(telemetry): Standardize SimHost metrics This change standardizes the metrics emitted by SimHost instances and their guests based on the OpenTelemetry semantic conventions. We now also report CPU time as opposed to CPU work as this metric is more commonly used. --- .../opendc-compute-simulator/build.gradle.kts | 1 + .../kotlin/org/opendc/compute/simulator/SimHost.kt | 504 ++++++++++----------- .../org/opendc/compute/simulator/internal/Guest.kt | 305 +++++++++++++ .../compute/simulator/internal/GuestListener.kt | 38 ++ .../org/opendc/compute/simulator/SimHostTest.kt | 117 ++--- .../org/opendc/experiments/capelin/Portfolio.kt | 3 +- .../capelin/export/parquet/ParquetDataWriter.kt | 1 - .../export/parquet/ParquetHostDataWriter.kt | 56 ++- .../export/parquet/ParquetServerDataWriter.kt | 38 +- .../export/parquet/ParquetServiceDataWriter.kt | 2 +- .../experiments/capelin/CapelinIntegrationTest.kt | 66 +-- .../opendc/simulator/compute/SimAbstractMachine.kt | 8 +- .../simulator/compute/kernel/SimHypervisor.kt | 2 +- .../simulator/compute/kernel/SimHypervisorTest.kt | 8 +- .../telemetry/compute/ComputeMetricAggregator.kt | 448 ++++++++++++++++++ .../telemetry/compute/ComputeMetricExporter.kt | 243 +--------- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 55 +-- .../org/opendc/telemetry/compute/table/HostData.kt | 33 +- .../opendc/telemetry/compute/table/ServerData.kt | 19 +- .../opendc/telemetry/compute/table/ServiceData.kt | 18 +- .../sdk/metrics/export/CoroutineMetricReader.kt | 2 +- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 12 +- .../org/opendc/web/runner/ScenarioManager.kt | 8 +- .../org/opendc/web/runner/WebComputeMonitor.kt | 41 +- 24 files changed, 1285 insertions(+), 743 deletions(-) create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt create mode 100644 opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index cad051e6..aaf69f78 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -40,5 +40,6 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index be6ef11e..793db907 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -26,13 +26,16 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimHypervisorProvider @@ -44,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.resources.SimResourceDistributorMaxMin import org.opendc.simulator.resources.SimResourceInterpreter import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlin.math.roundToLong /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -100,32 +103,29 @@ public class SimHost( /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( + private val hypervisor: SimHypervisor = hypervisor.create( interpreter, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - _totalWork.add(requestedWork) - _grantedWork.add(grantedWork) - _overcommittedWork.add(overcommittedWork) - _interferedWork.add(interferedWork) - _cpuDemand.record(cpuDemand) - _cpuUsage.record(cpuUsage) - _powerUsage.record(machine.powerDraw) - - reportTime() + _cpuDemand = cpuDemand + _cpuUsage = cpuUsage + + collectTime() } } ) + private var _cpuUsage = 0.0 + private var _cpuDemand = 0.0 /** * The virtual machines running on the hypervisor. @@ -145,109 +145,23 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The total number of guests. - */ - private val _guests = meter.upDownCounterBuilder("guests.total") - .setDescription("Total number of guests") - .setUnit("1") - .build() - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.upDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - - /** - * The CPU demand of the host. - */ - private val _cpuDemand = meter.histogramBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - - /** - * The CPU usage of the host. - */ - private val _cpuUsage = meter.histogramBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - - /** - * The power usage of the host. - */ - private val _powerUsage = meter.histogramBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - - /** - * The total amount of work supplied to the CPU. - */ - private val _totalWork = meter.counterBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The work performed by the CPU. - */ - private val _grantedWork = meter.counterBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount not performed by the CPU due to overcommitment. - */ - private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount of work not performed by the CPU due to interference. - */ - private val _interferedWork = meter.counterBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .ofDoubles() - .build() - - /** - * The amount of time in the system. + * The [GuestListener] that listens for guest events. */ - private val _totalTime = meter.counterBuilder("host.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - - /** - * The uptime of the host. - */ - private val _upTime = meter.counterBuilder("host.time.up") - .setDescription("The uptime of the host") - .setUnit("ms") - .build() + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The downtime of the host. - */ - private val _downTime = meter.counterBuilder("host.time.down") - .setDescription("The downtime of the host") - .setUnit("ms") - .build() + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } init { // Launch hypervisor onto machine scope.launch { try { + _bootTime = clock.millis() _state = HostState.UP machine.run(this@SimHost.hypervisor, emptyMap()) } catch (_: CancellationException) { @@ -259,29 +173,48 @@ public class SimHost( _state = HostState.DOWN } } - } - - private var _lastReport = clock.millis() - - private fun reportTime() { - if (!scope.isActive) - return - - val now = clock.millis() - val duration = now - _lastReport - - _totalTime.add(duration) - when (_state) { - HostState.UP -> _upTime.add(duration) - HostState.DOWN -> _downTime.add(duration) - } - // Track time of guests - for (guest in guests.values) { - guest.reportTime() - } - - _lastReport = now + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerDraw) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback(::collectPowerTotal) + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectTime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -295,8 +228,17 @@ public class SimHost( override suspend fun spawn(server: Server, start: Boolean) { val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - _guests.add(1) - Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name)) + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) } if (start) { @@ -320,7 +262,6 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return - _guests.add(-1) guest.terminate() } @@ -333,7 +274,7 @@ public class SimHost( } override fun close() { - reportTime() + reset() scope.cancel() machine.close() } @@ -341,21 +282,34 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { - reportTime() - _state = HostState.DOWN + reset() + for (guest in guests.values) { guest.fail() } } public suspend fun recover() { - reportTime() + collectTime() _state = HostState.UP + _bootTime = clock.millis() + for (guest in guests.values) { guest.start() } } + /** + * Reset the machine. + */ + private fun reset() { + collectTime() + + _state = HostState.DOWN + _cpuUsage = 0.0 + _cpuDemand = 0.0 + } + /** * Convert flavor to machine model. */ @@ -368,162 +322,168 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStart(vm: Guest) { - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val STATE_KEY = AttributeKey.stringKey("state") - private fun onGuestStop(vm: Guest) { - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") /** - * A virtual machine instance that the driver manages. + * Helper function to collect the guest counts on this host. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - var state: ServerState = ServerState.TERMINATED - - /** - * The attributes of the guest. - */ - val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.HOST_NAME, server.name) - .put(ResourceAttributes.HOST_ID, server.uid.toString()) - .put(ResourceAttributes.HOST_TYPE, server.flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) - .build() - - /** - * The amount of time in the system. - */ - private val _totalTime = meter.counterBuilder("guest.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - .bind(attributes) - - /** - * The uptime of the guest. - */ - private val _runningTime = meter.counterBuilder("guest.time.running") - .setDescription("The uptime of the guest") - .setUnit("ms") - .build() - .bind(attributes) - - /** - * The time the guest is in an error state. - */ - private val _errorTime = meter.counterBuilder("guest.time.error") - .setDescription("The time the guest is in an error state") - .setUnit("ms") - .build() - .bind(attributes) - - suspend fun start() { - when (state) { - ServerState.TERMINATED, ServerState.ERROR -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + for ((_, guest) in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + else -> invalid++ } } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) + } - suspend fun terminate() { - stop() - machine.close() - state = ServerState.DELETED + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) + + for (guest in guests.values) { + guest.collectCpuLimit(result) } + } - suspend fun fail() { - if (state != ServerState.RUNNING) { - return - } - stop() - state = ServerState.ERROR + private var _lastCpuTimeCallback = clock.millis() + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(result: ObservableLongMeasurement) { + val now = clock.millis() + val duration = now - _lastCpuTimeCallback + + try { + collectCpuTime(duration, result) + } finally { + _lastCpuTimeCallback = now } + } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - try { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - } catch (e: Throwable) { - cont.resumeWithException(e) - } - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - job = null - } - } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = this.model.cpuCount + val d = coreCount / _cpuLimit + + val counters = hypervisor.counters + val grantedWork = counters.actual + val overcommittedWork = counters.overcommit + val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0 + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + val lostTime = (interferedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(lostTime, _lostState) + + for (guest in guests.values) { + guest.collectCpuTime(duration, result) } + } + + private var _lastPowerCallback = clock.millis() + private var _totalPower = 0.0 - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + /** + * Helper function to collect the total power usage of the machine. + */ + private fun collectPowerTotal(result: ObservableDoubleMeasurement) { + val now = clock.millis() + val duration = now - _lastPowerCallback + + _totalPower += duration / 1000.0 * machine.powerDraw + result.observe(_totalPower) + + _lastPowerCallback = now + } + + private var _lastReport = clock.millis() + + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(result: ObservableLongMeasurement? = null) { + val now = clock.millis() + val duration = now - _lastReport + + try { + collectTime(duration, result) + } finally { + _lastReport = now } + } - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - onGuestStop(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == HostState.UP) { + _uptime += duration + } else if (state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var _lastReport = clock.millis() + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) - fun reportTime() { - if (state == ServerState.DELETED) - return + for (guest in guests.values) { + guest.collectUptime(duration, result) + } + } - val now = clock.millis() - val duration = now - _lastReport + private var _bootTime = Long.MIN_VALUE - _totalTime.add(duration) - when (state) { - ServerState.RUNNING -> _runningTime.add(duration) - ServerState.ERROR -> _errorTime.add(duration) - else -> {} - } + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } - _lastReport = now + for (guest in guests.values) { + guest.collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..90562e2f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.SimAbstractMachine +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, server.name) + .put(ResourceAttributes.HOST_ID, server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) + .build() + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Terminate the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun terminate() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Run the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "up") + .build() + private val _downState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } + } + + private val _activeState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "active") + .build() + private val _stealState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "steal") + .build() + private val _lostState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "lost") + .build() + private val _idleState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "idle") + .build() + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = server.flavor.cpuCount + val d = coreCount / _cpuLimit + + var grantedWork = 0.0 + var overcommittedWork = 0.0 + + for (cpu in (machine as SimAbstractMachine).cpus) { + val counters = cpu.counters + grantedWork += counters.actual + overcommittedWork += counters.overcommit + } + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(0, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 318b5a5d..9c879e5e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,11 +23,9 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -44,17 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration import java.util.* import kotlin.coroutines.resume -import kotlin.math.roundToLong /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { private lateinit var machineModel: MachineModel @@ -73,18 +74,23 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var totalWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val virtDriver = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), @@ -132,20 +138,16 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() - grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() - overcommittedWork = metricsByName.getValue("cpu.work.overcommit").doubleSumData.points.first().value.roundToLong() - return CompletableResultCode.ofSuccess() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime + } } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, + ), exportInterval = Duration.ofSeconds(duration) ) @@ -172,9 +174,9 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4147200, totalWork, "Requested work does not match") }, - { assertEquals(2107200, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(659, activeTime, "Active time does not match") }, + { assertEquals(2342, idleTime, "Idle time does not match") }, + { assertEquals(638, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -184,21 +186,26 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var totalWork = 0L - var grantedWork = 0L - var totalTime = 0L - var downTime = 0L - var guestTotalTime = 0L - var guestDownTime = 0L - + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val host = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), @@ -230,24 +237,22 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - totalWork = metricsByName.getValue("cpu.work.total").doubleSumData.points.first().value.roundToLong() - grantedWork = metricsByName.getValue("cpu.work.granted").doubleSumData.points.first().value.roundToLong() - totalTime = metricsByName.getValue("host.time.total").longSumData.points.first().value - downTime = metricsByName.getValue("host.time.down").longSumData.points.first().value - guestTotalTime = metricsByName.getValue("guest.time.total").longSumData.points.first().value - guestDownTime = metricsByName.getValue("guest.time.error").longSumData.points.first().value - - return CompletableResultCode.ofSuccess() - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime + } + } + ), exportInterval = Duration.ofSeconds(duration) ) @@ -276,12 +281,12 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2226040, totalWork, "Total time does not match") }, - { assertEquals(1086040, grantedWork, "Down time does not match") }, - { assertEquals(1200001, totalTime, "Total time does not match") }, - { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, - { assertEquals(5000, downTime, "Down time does not match") }, - { assertEquals(5000, guestDownTime, "Guest down time does not match") }, + { assertEquals(2661, idleTime, "Idle time does not match") }, + { assertEquals(339, activeTime, "Active time does not match") }, + { assertEquals(1195001, uptime, "Uptime does not match") }, + { assertEquals(5000, downtime, "Downtime does not match") }, + { assertEquals(1195000, guestUptime, "Guest uptime does not match") }, + { assertEquals(5000, guestDowntime, "Guest downtime does not match") }, ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 3ec424f1..6261ebbf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -149,9 +149,10 @@ abstract class Portfolio(name: String) : Experiment(name) { } finally { simulator.close() metricReader.close() + monitor.close() } - val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index 5684bde9..e3d15c3b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -27,7 +27,6 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.example.Paper.schema import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index fa00fc35..36207045 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -44,20 +44,31 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : } override fun convert(builder: GenericRecordBuilder, data: HostData) { - builder["timestamp"] = data.timestamp + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["host_id"] = data.host.id - builder["powered_on"] = true + builder["num_cpus"] = data.host.cpuCount + builder["mem_capacity"] = data.host.memCapacity + builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["total_work"] = data.totalWork - builder["granted_work"] = data.grantedWork - builder["overcommitted_work"] = data.overcommittedWork - builder["interfered_work"] = data.interferedWork - builder["cpu_usage"] = data.cpuUsage - builder["cpu_demand"] = data.cpuDemand - builder["power_draw"] = data.powerDraw - builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.cpuCount + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid } override fun toString(): String = "host-writer" @@ -69,18 +80,21 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .fields() .requiredLong("timestamp") .requiredString("host_id") - .requiredBoolean("powered_on") + .requiredInt("num_cpus") + .requiredLong("mem_capacity") .requiredLong("uptime") .requiredLong("downtime") - .requiredDouble("total_work") - .requiredDouble("granted_work") - .requiredDouble("overcommitted_work") - .requiredDouble("interfered_work") - .requiredDouble("cpu_usage") - .requiredDouble("cpu_demand") - .requiredDouble("power_draw") - .requiredInt("num_instances") - .requiredInt("num_cpus") + .optionalLong("boot_time") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index bb2db4b7..c5a5e7c0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -40,18 +40,31 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { return builder .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("state", true) + .withDictionaryEncoding("host_id", true) .build() } override fun convert(builder: GenericRecordBuilder, data: ServerData) { - builder["timestamp"] = data.timestamp - builder["server_id"] = data.server - // builder["state"] = data.server.state + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + builder["num_vcpus"] = data.server.cpuCount + builder["mem_capacity"] = data.server.memCapacity + builder["uptime"] = data.uptime builder["downtime"] = data.downtime - // builder["num_vcpus"] = data.server.flavor.cpuCount - // builder["mem_capacity"] = data.server.flavor.memorySize + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime } override fun toString(): String = "server-writer" @@ -63,11 +76,18 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .fields() .requiredLong("timestamp") .requiredString("server_id") - .requiredString("state") - .requiredLong("uptime") - .requiredLong("downtime") + .optionalString("host_id") .requiredInt("num_vcpus") .requiredLong("mem_capacity") + .requiredLong("uptime") + .requiredLong("downtime") + .optionalLong("boot_time") + .requiredLong("scheduling_latency") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 29b48878..d9ca55cb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -35,7 +35,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : ParquetDataWriter(path, SCHEMA, bufferSize) { override fun convert(builder: GenericRecordBuilder, data: ServiceData) { - builder["timestamp"] = data.timestamp + builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown builder["servers_pending"] = data.serversPending diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 81405acf..727530e3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -50,7 +50,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration import java.util.* -import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -102,7 +101,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -118,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -151,7 +150,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -163,10 +162,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, - { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -202,7 +201,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -214,10 +213,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, - { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -247,7 +246,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -259,10 +258,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } ) } @@ -286,18 +286,20 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ComputeMonitor { - var totalWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - var totalInterferedWork = 0L - var totalPowerDraw = 0.0 + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L override fun record(data: HostData) { - this.totalWork += data.totalWork.roundToLong() - totalGrantedWork += data.grantedWork.roundToLong() - totalOvercommittedWork += data.overcommittedWork.roundToLong() - totalInterferedWork += data.interferedWork.roundToLong() - totalPowerDraw += data.powerDraw + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 266db0dd..f9db048d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -49,22 +49,22 @@ public abstract class SimAbstractMachine( /** * The resources allocated for this machine. */ - protected abstract val cpus: List + public abstract val cpus: List /** * The memory interface of the machine. */ - protected val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory) + public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory) /** * The network interfaces available to the machine. */ - protected val net: List = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) } + public val net: List = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) } /** * The network interfaces available to the machine. */ - protected val storage: List = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) } + public val storage: List = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) } /** * The peripherals of the machine. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index af28c346..3b49d515 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -64,7 +64,7 @@ public interface SimHypervisor : SimWorkload { */ public fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 8dea0045..1f010338 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -70,14 +70,14 @@ internal class SimHypervisorTest { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - totalRequestedWork += requestedWork + totalRequestedWork += totalWork totalGrantedWork += grantedWork totalOvercommittedWork += overcommittedWork } @@ -128,14 +128,14 @@ internal class SimHypervisorTest { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - totalRequestedWork += requestedWork + totalRequestedWork += totalWork totalGrantedWork += grantedWork totalOvercommittedWork += overcommittedWork } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt new file mode 100644 index 00000000..e9449634 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -0,0 +1,448 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.PointData +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.telemetry.compute.table.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + */ +public class ComputeMetricAggregator { + private val _service = ServiceAggregator() + private val _hosts = mutableMapOf() + private val _servers = mutableMapOf() + + /** + * Process the specified [metrics] for this cycle. + */ + public fun process(metrics: Collection) { + val service = _service + val hosts = _hosts + val servers = _servers + + for (metric in metrics) { + val resource = metric.resource + + when (metric.name) { + // ComputeService + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "up" -> service.hostsUp = point.value.toInt() + "down" -> service.hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "pending" -> service.serversPending = point.value.toInt() + "active" -> service.serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[RESULT_KEY]) { + "success" -> service.attemptsSuccess = point.value.toInt() + "failure" -> service.attemptsFailure = point.value.toInt() + "error" -> service.attemptsError = point.value.toInt() + } + } + } + "scheduler.latency" -> { + for (point in metric.doubleHistogramData.points) { + val server = getServer(servers, point) ?: continue + server.schedulingLatency = (point.sum / point.count).roundToLong() + } + } + + // SimHost + "system.guests" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "terminated" -> agg.guestsTerminated = point.value.toInt() + "running" -> agg.guestsRunning = point.value.toInt() + "error" -> agg.guestsRunning = point.value.toInt() + "invalid" -> agg.guestsInvalid = point.value.toInt() + } + } + } + "system.cpu.limit" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.doubleGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.cpuLimit = point.value + server.host = agg.host + } else { + agg.cpuLimit = point.value + } + } + } + "system.cpu.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUsage = metric.doubleGaugeData.points.first().value + } + "system.cpu.demand" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuDemand = metric.doubleGaugeData.points.first().value + } + "system.cpu.utilization" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUtilization = metric.doubleGaugeData.points.first().value + } + "system.cpu.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + val state = point.attributes[STATE_KEY] + if (server != null) { + when (state) { + "active" -> server.cpuActiveTime = point.value + "idle" -> server.cpuIdleTime = point.value + "steal" -> server.cpuStealTime = point.value + "lost" -> server.cpuLostTime = point.value + } + server.host = agg.host + } else { + when (state) { + "active" -> agg.cpuActiveTime = point.value + "idle" -> agg.cpuIdleTime = point.value + "steal" -> agg.cpuStealTime = point.value + "lost" -> agg.cpuLostTime = point.value + } + } + } + } + "system.power.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerUsage = metric.doubleGaugeData.points.first().value + } + "system.power.total" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerTotal = metric.doubleSumData.points.first().value + } + "system.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + + if (server != null) { + when (point.attributes[STATE_KEY]) { + "up" -> server.uptime = point.value + "down" -> server.downtime = point.value + } + server.host = agg.host + } else { + when (point.attributes[STATE_KEY]) { + "up" -> agg.uptime = point.value + "down" -> agg.downtime = point.value + } + } + } + } + "system.time.boot" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.bootTime = point.value + server.host = agg.host + } else { + agg.bootTime = point.value + } + } + } + } + } + } + + /** + * Collect the data via the [monitor]. + */ + public fun collect(now: Instant, monitor: ComputeMonitor) { + monitor.record(_service.collect(now)) + + for (host in _hosts.values) { + monitor.record(host.collect(now)) + } + + for (server in _servers.values) { + monitor.record(server.collect(now)) + } + } + + /** + * Obtain the [HostAggregator] for the specified [resource]. + */ + private fun getHost(hosts: MutableMap, resource: Resource): HostAggregator? { + val id = resource.attributes[HOST_ID] + return if (id != null) { + hosts.computeIfAbsent(id) { HostAggregator(resource) } + } else { + null + } + } + + /** + * Obtain the [ServerAggregator] for the specified [point]. + */ + private fun getServer(servers: MutableMap, point: PointData): ServerAggregator? { + val id = point.attributes[ResourceAttributes.HOST_ID] + return if (id != null) { + servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + } else { + null + } + } + + /** + * An aggregator for service metrics before they are reported. + */ + internal class ServiceAggregator { + @JvmField var hostsUp = 0 + @JvmField var hostsDown = 0 + + @JvmField var serversPending = 0 + @JvmField var serversActive = 0 + + @JvmField var attemptsSuccess = 0 + @JvmField var attemptsFailure = 0 + @JvmField var attemptsError = 0 + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServiceData = toServiceData(now) + + /** + * Convert the aggregator state to an immutable [ServiceData]. + */ + private fun toServiceData(now: Instant): ServiceData { + return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + internal class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME] ?: "", + resource.attributes[HOST_ARCH] ?: "", + resource.attributes[HOST_NCPUS]?.toInt() ?: 0, + resource.attributes[HOST_MEM_CAPACITY] ?: 0, + ) + + @JvmField var guestsTerminated = 0 + @JvmField var guestsRunning = 0 + @JvmField var guestsError = 0 + @JvmField var guestsInvalid = 0 + + @JvmField var cpuLimit = 0.0 + @JvmField var cpuUsage = 0.0 + @JvmField var cpuDemand = 0.0 + @JvmField var cpuUtilization = 0.0 + + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + @JvmField var powerUsage = 0.0 + @JvmField var powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + @JvmField var uptime = 0L + private var previousUptime = 0L + @JvmField var downtime = 0L + private var previousDowntime = 0L + @JvmField var bootTime = Long.MIN_VALUE + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): HostData { + val data = toHostData(now) + + // Reset intermediate state for next aggregation + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + previousPowerTotal = powerTotal + previousUptime = uptime + previousDowntime = downtime + + guestsTerminated = 0 + guestsRunning = 0 + guestsError = 0 + guestsInvalid = 0 + + cpuLimit = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + cpuUtilization = 0.0 + + powerUsage = 0.0 + + return data + } + + /** + * Convert the aggregator state to an immutable [HostData] instance. + */ + private fun toHostData(now: Instant): HostData { + return HostData( + now, + host, + guestsTerminated, + guestsRunning, + guestsError, + guestsInvalid, + cpuLimit, + cpuUsage, + cpuDemand, + cpuUtilization, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime, + powerUsage, + powerTotal - previousPowerTotal, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null + ) + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + internal class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + var host: HostInfo? = null + + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var bootTime: Long = 0 + @JvmField var schedulingLatency = 0L + @JvmField var cpuLimit = 0.0 + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServerData { + val data = toServerData(now) + + previousUptime = uptime + previousDowntime = downtime + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + + host = null + cpuLimit = 0.0 + + return data + } + + /** + * Convert the aggregator state into an immutable [ServerData]. + */ + private fun toServerData(now: Instant): ServerData { + return ServerData( + now, + server, + host, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, + schedulingLatency, + cpuLimit, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime + ) + } + } + + private companion object { + private val STATE_KEY = AttributeKey.stringKey("state") + private val RESULT_KEY = AttributeKey.stringKey("result") + } +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 408d1325..ea96f721 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -22,28 +22,24 @@ package org.opendc.telemetry.compute -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServerInfo import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { + /** + * A [ComputeMetricAggregator] that actually performs the aggregation. + */ + private val agg = ComputeMetricAggregator() + override fun export(metrics: Collection): CompletableResultCode { return try { - reportServiceMetrics(metrics) - reportHostMetrics(metrics) - reportServerMetrics(metrics) + agg.process(metrics) + agg.collect(clock.instant(), monitor) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() @@ -53,229 +49,4 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - - private fun reportServiceMetrics(metrics: Collection) { - monitor.record(extractServiceMetrics(clock.millis(), metrics)) - } - - private val hosts = mutableMapOf() - private val servers = mutableMapOf() - - private fun reportHostMetrics(metrics: Collection) { - val hosts = hosts - val servers = servers - - for (metric in metrics) { - val resource = metric.resource - val hostId = resource.attributes[HOST_ID] ?: continue - val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) } - agg.accept(metric) - } - - val monitor = monitor - val now = clock.millis() - for ((_, server) in servers) { - server.record(monitor, now) - } - } - - private fun reportServerMetrics(metrics: Collection) { - val hosts = hosts - - for (metric in metrics) { - val resource = metric.resource - val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host } - - when (metric.name) { - "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point -> - agg.schedulingLatency = point.sum / point.count - } - "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point -> - agg.uptime = point.value - } - "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point -> - agg.downtime = point.value - } - } - } - - val monitor = monitor - val now = clock.millis() - for ((_, host) in hosts) { - host.record(monitor, now) - } - } - - /** - * Helper function to map a metric by the server. - */ - private inline fun

mapByServer(points: Collection

, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) { - for (point in points) { - val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) } - - if (host != null) { - agg.host = host - } - - block(agg, point) - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - private class HostAggregator(resource: Resource) { - /** - * The static information about this host. - */ - val host = HostInfo( - resource.attributes[HOST_ID]!!, - resource.attributes[HOST_NAME]!!, - resource.attributes[HOST_ARCH]!!, - resource.attributes[HOST_NCPUS]!!.toInt(), - resource.attributes[HOST_MEM_CAPACITY]!!, - ) - - private var totalWork: Double = 0.0 - private var previousTotalWork = 0.0 - private var grantedWork: Double = 0.0 - private var previousGrantedWork = 0.0 - private var overcommittedWork: Double = 0.0 - private var previousOvercommittedWork = 0.0 - private var interferedWork: Double = 0.0 - private var previousInterferedWork = 0.0 - private var cpuUsage: Double = 0.0 - private var cpuDemand: Double = 0.0 - private var instanceCount: Int = 0 - private var powerDraw: Double = 0.0 - private var uptime: Long = 0 - private var previousUptime = 0L - private var downtime: Long = 0 - private var previousDowntime = 0L - - fun record(monitor: ComputeMonitor, now: Long) { - monitor.record( - HostData( - now, - host, - totalWork - previousTotalWork, - grantedWork - previousGrantedWork, - overcommittedWork - previousOvercommittedWork, - interferedWork - previousInterferedWork, - cpuUsage, - cpuDemand, - instanceCount, - powerDraw, - uptime - previousUptime, - downtime - previousDowntime, - ) - ) - - previousTotalWork = totalWork - previousGrantedWork = grantedWork - previousOvercommittedWork = overcommittedWork - previousInterferedWork = interferedWork - previousUptime = uptime - previousDowntime = downtime - reset() - } - - /** - * Accept the [MetricData] for this host. - */ - fun accept(data: MetricData) { - when (data.name) { - "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value - "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value - "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value - "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value - "power.usage" -> powerDraw = acceptHistogram(data) - "cpu.usage" -> cpuUsage = acceptHistogram(data) - "cpu.demand" -> cpuDemand = acceptHistogram(data) - "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt() - "host.time.up" -> uptime = data.longSumData.points.first().value - "host.time.down" -> downtime = data.longSumData.points.first().value - } - } - - private fun acceptHistogram(data: MetricData): Double { - return when (data.type) { - MetricDataType.HISTOGRAM -> { - val point = data.doubleHistogramData.points.first() - point.sum / point.count - } - MetricDataType.SUMMARY -> { - val point = data.doubleSummaryData.points.first() - point.sum / point.count - } - else -> error("Invalid metric type") - } - } - - private fun reset() { - totalWork = 0.0 - grantedWork = 0.0 - overcommittedWork = 0.0 - interferedWork = 0.0 - cpuUsage = 0.0 - cpuDemand = 0.0 - instanceCount = 0 - powerDraw = 0.0 - uptime = 0L - downtime = 0L - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - private class ServerAggregator(attributes: Attributes) { - /** - * The static information about this server. - */ - val server = ServerInfo( - attributes[ResourceAttributes.HOST_ID]!!, - attributes[ResourceAttributes.HOST_NAME]!!, - attributes[ResourceAttributes.HOST_TYPE]!!, - attributes[ResourceAttributes.HOST_ARCH]!!, - attributes[ResourceAttributes.HOST_IMAGE_ID]!!, - attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, - attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), - attributes[AttributeKey.longKey("host.mem_capacity")]!!, - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - var host: HostInfo? = null - - @JvmField var uptime: Long = 0 - private var previousUptime = 0L - @JvmField var downtime: Long = 0 - private var previousDowntime = 0L - @JvmField var schedulingLatency = 0.0 - - fun record(monitor: ComputeMonitor, now: Long) { - monitor.record( - ServerData( - now, - server, - null, - uptime - previousUptime, - downtime - previousDowntime, - ) - ) - - previousUptime = uptime - previousDowntime = downtime - reset() - } - - private fun reset() { - host = null - uptime = 0L - downtime = 0L - } - } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index f3690ee8..25d346fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,64 +22,31 @@ package org.opendc.telemetry.compute -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData +import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData { +public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) } /** * Extract a [ServiceData] object from the specified list of metric data. */ -public fun extractServiceMetrics(timestamp: Long, metrics: Collection): ServiceData { - val resultKey = AttributeKey.stringKey("result") - val stateKey = AttributeKey.stringKey("state") - - var hostsUp = 0 - var hostsDown = 0 - - var serversPending = 0 - var serversActive = 0 - - var attemptsSuccess = 0 - var attemptsFailure = 0 - var attemptsError = 0 - - for (metric in metrics) { - when (metric.name) { - "scheduler.hosts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[stateKey]) { - "up" -> hostsUp = point.value.toInt() - "down" -> hostsDown = point.value.toInt() - } - } - } - "scheduler.servers" -> { - for (point in metric.longSumData.points) { - when (point.attributes[stateKey]) { - "pending" -> serversPending = point.value.toInt() - "active" -> serversActive = point.value.toInt() - } - } - } - "scheduler.attempts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[resultKey]) { - "success" -> attemptsSuccess = point.value.toInt() - "failure" -> attemptsFailure = point.value.toInt() - "error" -> attemptsError = point.value.toInt() - } - } - } +public fun extractServiceMetrics(timestamp: Instant, metrics: Collection): ServiceData { + lateinit var serviceData: ServiceData + val agg = ComputeMetricAggregator() + val monitor = object : ComputeMonitor { + override fun record(data: ServiceData) { + serviceData = data } } - return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + agg.process(metrics) + agg.collect(timestamp, monitor) + return serviceData } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index e3ecda3d..8e787b97 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -22,20 +22,29 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for a particular host. */ public data class HostData( - public val timestamp: Long, - public val host: HostInfo, - public val totalWork: Double, - public val grantedWork: Double, - public val overcommittedWork: Double, - public val interferedWork: Double, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val instanceCount: Int, - public val powerDraw: Double, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val host: HostInfo, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, + val cpuLimit: Double, + val cpuUsage: Double, + val cpuDemand: Double, + val cpuUtilization: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, + val powerUsage: Double, + val powerTotal: Double, + val uptime: Long, + val downtime: Long, + val bootTime: Instant? ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 7fde86d9..c48bff3a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -22,13 +22,22 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for a particular server. */ public data class ServerData( - public val timestamp: Long, - public val server: ServerInfo, - public val host: HostInfo?, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val server: ServerInfo, + val host: HostInfo?, + val uptime: Long, + val downtime: Long, + val bootTime: Instant?, + val schedulingLatency: Long, + val cpuLimit: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index da2ebdf4..6db1399d 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -22,16 +22,18 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for the compute service. */ public data class ServiceData( - public val timestamp: Long, - public val hostsUp: Int, - public val hostsDown: Int, - public val serversPending: Int, - public val serversActive: Int, - public val attemptsSuccess: Int, - public val attemptsFailure: Int, - public val attemptsError: Int + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int ) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 8f19ab81..07f0ff7f 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -44,7 +44,7 @@ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List, private val exporter: MetricExporter, - private val exportInterval: Duration = Duration.ofMinutes(1) + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 960d5ebd..483558e1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -26,8 +26,6 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.experiments.capelin.* @@ -49,7 +47,6 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration import org.opendc.web.client.model.Scenario @@ -187,11 +184,6 @@ class RunnerCli : CliktCommand(name = "runner") { val seeder = Random(repeat.toLong()) - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) @@ -215,7 +207,7 @@ class RunnerCli : CliktCommand(name = "runner") { interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1)) try { simulator.run(trace) @@ -224,7 +216,7 @@ class RunnerCli : CliktCommand(name = "runner") { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index e0e3488f..a0c281e8 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -65,10 +65,10 @@ public class ScenarioManager(private val client: ApiClient) { client.updateJob( id, SimulationState.FINISHED, mapOf( - "total_requested_burst" to results.map { it.totalWork }, - "total_granted_burst" to results.map { it.totalGrantedWork }, - "total_overcommitted_burst" to results.map { it.totalOvercommittedWork }, - "total_interfered_burst" to results.map { it.totalInterferedWork }, + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, "mean_cpu_usage" to results.map { it.meanCpuUsage }, "mean_cpu_demand" to results.map { it.meanCpuDemand }, "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt index 5f2c474b..bb412738 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -33,24 +33,23 @@ import kotlin.math.roundToLong */ class WebComputeMonitor : ComputeMonitor { override fun record(data: HostData) { - val duration = data.uptime val slices = data.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalWork + data.totalWork, - hostAggregateMetrics.totalGrantedWork + data.grantedWork, - hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, - hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, - hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, + hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, + hostAggregateMetrics.totalStealTime + data.cpuStealTime, + hostAggregateMetrics.totalLostTime + data.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + data.powerTotal, hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + data.instanceCount * slices + hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices ) hostMetrics.compute(data.host.id) { _, prev -> HostMetrics( data.cpuUsage + (prev?.cpuUsage ?: 0.0), data.cpuDemand + (prev?.cpuDemand ?: 0.0), - data.instanceCount + (prev?.instanceCount ?: 0), + data.guestsRunning + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) } @@ -58,13 +57,13 @@ class WebComputeMonitor : ComputeMonitor { private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() private val hostMetrics: MutableMap = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60 * 1000 + private val SLICE_LENGTH: Long = 5 * 60 data class AggregateHostMetrics( - val totalWork: Double = 0.0, - val totalGrantedWork: Double = 0.0, - val totalOvercommittedWork: Double = 0.0, - val totalInterferedWork: Double = 0.0, + val totalActiveTime: Long = 0L, + val totalIdleTime: Long = 0L, + val totalStealTime: Long = 0L, + val totalLostTime: Long = 0L, val totalPowerDraw: Double = 0.0, val totalFailureSlices: Double = 0.0, val totalFailureVmSlices: Double = 0.0, @@ -99,10 +98,10 @@ class WebComputeMonitor : ComputeMonitor { fun getResult(): Result { return Result( - hostAggregateMetrics.totalWork, - hostAggregateMetrics.totalGrantedWork, - hostAggregateMetrics.totalOvercommittedWork, - hostAggregateMetrics.totalInterferedWork, + hostAggregateMetrics.totalActiveTime, + hostAggregateMetrics.totalIdleTime, + hostAggregateMetrics.totalStealTime, + hostAggregateMetrics.totalLostTime, hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), @@ -118,10 +117,10 @@ class WebComputeMonitor : ComputeMonitor { } data class Result( - val totalWork: Double, - val totalGrantedWork: Double, - val totalOvercommittedWork: Double, - val totalInterferedWork: Double, + val totalActiveTime: Long, + val totalIdleTime: Long, + val totalStealTime: Long, + val totalLostTime: Long, val meanCpuUsage: Double, val meanCpuDemand: Double, val meanNumDeployedImages: Double, -- cgit v1.2.3 From e2537c59bef0645b948e92553cc5a42a8c0f7256 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Sep 2021 21:34:00 +0200 Subject: feat(capelin): Use logical types for Parquet output columns This change updates the output schema for the experiment data to use logical types where possible. This adds additional context for the writer and the reader on how to process the column (efficiently). --- .../capelin/export/parquet/AvroUtils.kt | 44 ++++++++++++++++++++++ .../export/parquet/ParquetHostDataWriter.kt | 15 ++++---- .../export/parquet/ParquetServerDataWriter.kt | 18 +++++---- .../export/parquet/ParquetServiceDataWriter.kt | 2 +- 4 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt new file mode 100644 index 00000000..a4676f31 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +internal fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 36207045..58388cb1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -47,8 +47,6 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id - builder["num_cpus"] = data.host.cpuCount - builder["mem_capacity"] = data.host.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -57,12 +55,15 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["boot_time"] = bootTime.toEpochMilli() } + builder["cpu_count"] = data.host.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + builder["mem_limit"] = data.host.memCapacity + builder["power_total"] = data.powerTotal builder["guests_terminated"] = data.guestsTerminated @@ -78,18 +79,18 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("host_id") - .requiredInt("num_cpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .requiredDouble("power_total") .requiredInt("guests_terminated") .requiredInt("guests_running") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index c5a5e7c0..43b5f469 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -30,6 +30,7 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerData import java.io.File +import java.util.* /** * A Parquet event writer for [ServerData]s. @@ -49,8 +50,6 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["server_id"] = data.server.id builder["host_id"] = data.host?.id - builder["num_vcpus"] = data.server.cpuCount - builder["mem_capacity"] = data.server.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -60,11 +59,14 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : } builder["scheduling_latency"] = data.schedulingLatency + builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity } override fun toString(): String = "server-writer" @@ -74,20 +76,20 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .record("server") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("server_id") - .optionalString("host_id") - .requiredInt("num_vcpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .requiredLong("scheduling_latency") + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index d9ca55cb..2928f445 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -52,7 +52,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() .requiredInt("hosts_up") .requiredInt("hosts_down") .requiredInt("servers_pending") -- cgit v1.2.3