From 144d9d0c118097900c086b7fb8b1cf22a788592b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 13 Sep 2021 12:22:32 +0200 Subject: build(telemetry): Update to OpenTelemetry 1.6.0 This change updates the opentelemetry-java library to version 1.6.0. --- .../kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 44cf92a8..cf88535d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -110,7 +110,7 @@ class CapelinIntegrationTest { { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } -- cgit v1.2.3 From 7960791430b0536df4704493c01d32e38f37f022 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 12:52:17 +0200 Subject: refactor(experiments): Remove energy experiments shell This change removes the energy experiments. The experiments only provided a setup for the original experiments and is not able to reproduce the results without further worker. --- .../opendc-experiments-energy21/.gitignore | 3 - .../opendc-experiments-energy21/build.gradle.kts | 47 ----- .../experiments/energy21/EnergyExperiment.kt | 208 --------------------- .../src/main/resources/application.conf | 14 -- 4 files changed, 272 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-energy21/.gitignore delete mode 100644 opendc-experiments/opendc-experiments-energy21/build.gradle.kts delete mode 100644 opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt delete mode 100644 opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-energy21/.gitignore b/opendc-experiments/opendc-experiments-energy21/.gitignore deleted file mode 100644 index 55da79f8..00000000 --- a/opendc-experiments/opendc-experiments-energy21/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -input/ -output/ -.ipynb_checkpoints diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts deleted file mode 100644 index cc58e5f1..00000000 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiments for the OpenDC Energy work" - -/* Build configuration */ -plugins { - `experiment-conventions` - `testing-conventions` -} - -dependencies { - api(platform(projects.opendcPlatform)) - api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcExperiments.opendcExperimentsCapelin) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.kotlin.logging) - implementation(libs.config) - - implementation(libs.parquet) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } -} diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt deleted file mode 100644 index d9194969..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.energy21 - -import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import mu.KotlinLogging -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor -import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.* -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor -import java.io.File -import java.time.Clock -import java.util.* - -/** - * Experiments for the OpenDC project on Energy modeling. - */ -public class EnergyExperiment : Experiment("Energy Modeling 2021") { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The configuration to use. - */ - private val config = ConfigFactory.load().getConfig("opendc.experiments.energy21") - - /** - * The traces to test. - */ - private val trace by anyOf("solvinity") - - /** - * The power models to test. - */ - private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION) - - override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(), - subsetSize = Int.MAX_VALUE - ) - - val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) - - withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - } - - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) - logger.debug { - "Finish SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" - } - } - - /** - * Construct the environment for a simulated compute service.. - */ - public suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - scheduler: ComputeScheduler, - block: suspend CoroutineScope.(ComputeService) -> Unit - ): Unit = coroutineScope { - val model = createMachineModel() - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = List(64) { id -> - SimHost( - UUID(0, id.toLong()), - "node-$id", - model, - emptyMap(), - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - PerformanceScalingGovernor(), - powerModel.driver - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } - } - - /** - * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html - */ - private fun createMachineModel(): MachineModel { - val node = ProcessingNode("AMD", "am64", "EPYC 7742", 64) - val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } - val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } - - return MachineModel(cpus, memory) - } - - /** - * The power models to test. - */ - public enum class PowerModelType { - CUBIC { - override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4)) - }, - - LINEAR { - override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4)) - }, - - SQRT { - override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4)) - }, - - SQUARE { - override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4)) - }, - - INTERPOLATION { - override val driver: PowerDriver = SimplePowerDriver( - InterpolationPowerModel( - listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0) - ) - ) - }, - - MSE { - override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4)) - }, - - ASYMPTOTIC { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false)) - }, - - ASYMPTOTIC_DVFS { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true)) - }; - - public abstract val driver: PowerDriver - } -} diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf b/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf deleted file mode 100644 index 263da0fe..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf +++ /dev/null @@ -1,14 +0,0 @@ -# Default configuration for the energy experiments -opendc.experiments.energy21 { - # Path to the directory containing the input traces - trace-path = input/traces - - # Path to the output directory to write the results to - output-path = output -} - -opendc.experiments.capelin { - env-path = input/environments/ - trace-path = input/traces/ - output-path = output -} -- cgit v1.2.3 From eef8ea3ab40a4e4a12ba96f839c35c5804884bc1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 13 Sep 2021 12:25:51 +0200 Subject: refactor(capelin): Improve ParquetDataWriter implementation This change improves the ParquetDataWriter class to support more complex use-cases. It now allows subclasses to modify the writer options. In addition to this, a subclass for writing server data is added. --- .../capelin/export/parquet/ParquetDataWriter.kt | 125 ++++++++++++--------- .../capelin/export/parquet/ParquetExportMonitor.kt | 14 ++- .../export/parquet/ParquetHostDataWriter.kt | 74 +++++++----- .../export/parquet/ParquetServerDataWriter.kt | 73 ++++++++++++ .../export/parquet/ParquetServiceDataWriter.kt | 46 ++++---- 5 files changed, 225 insertions(+), 107 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index c5cb80e2..5684bde9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -25,11 +25,13 @@ package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.example.Paper.schema import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,50 +40,94 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -public open class ParquetDataWriter( - private val path: File, +abstract class ParquetDataWriter( + path: File, private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { + bufferSize: Int = 4096 +) : AutoCloseable { /** * The logging instance to use. */ private val logger = KotlinLogging.logger {} /** - * The writer to write the Parquet file. + * The queue of commands to process. */ - private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) /** - * The queue of commands to process. + * An exception to be propagated to the actual writer. */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + private var exception: Throwable? = null /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { run() } + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) /** * Write the specified metrics to the database. */ - public fun write(event: T) { - queue.put(Action.Write(event)) + fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) } /** * Signal the writer to stop. */ - public override fun close() { - queue.put(Action.Stop) + override fun close() { + writerThread.interrupt() writerThread.join() } @@ -90,38 +136,11 @@ public open class ParquetDataWriter( } /** - * Start the writer thread. + * Process the specified [data] to be written to the Parquet file. */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.data as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write(val data: T) : Action() + private fun process(writer: ParquetWriter, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 79b84e9d..b057e932 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val hostWriter = ParquetHostDataWriter( File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + private val serviceWriter = ParquetServiceDataWriter( File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + override fun record(data: ServerData) { + serverWriter.write(data) + } + override fun record(data: HostData) { hostWriter.write(data) } @@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int override fun close() { hostWriter.close() serviceWriter.close() + serverWriter.close() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 8912c12e..7062a275 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,10 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -32,42 +36,52 @@ import java.io.File * A Parquet event writer for [HostData]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, schema, convert, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { - override fun toString(): String = "host-writer" + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } - public companion object { - private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> - record.put("host_id", data.host.name) - record.put("state", data.host.state.name) - record.put("timestamp", data.timestamp) - record.put("total_work", data.totalWork) - record.put("granted_work", data.grantedWork) - record.put("overcommitted_work", data.overcommittedWork) - record.put("interfered_work", data.interferedWork) - record.put("cpu_usage", data.cpuUsage) - record.put("cpu_demand", data.cpuDemand) - record.put("power_draw", data.powerDraw) - record.put("instance_count", data.instanceCount) - record.put("cores", data.host.model.cpuCount) - } + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp + builder["host_id"] = data.host.name + builder["powered_on"] = data.host.state == HostState.UP + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["total_work"] = data.totalWork + builder["granted_work"] = data.grantedWork + builder["overcommitted_work"] = data.overcommittedWork + builder["interfered_work"] = data.interferedWork + builder["cpu_usage"] = data.cpuUsage + builder["cpu_demand"] = data.cpuDemand + builder["power_draw"] = data.powerDraw + builder["num_instances"] = data.instanceCount + builder["num_cpus"] = data.host.model.cpuCount + } + + override fun toString(): String = "host-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("requested_work").type().longType().noDefault() - .name("granted_work").type().longType().noDefault() - .name("overcommitted_work").type().longType().noDefault() - .name("interfered_work").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("instance_count").type().intType().noDefault() - .name("cores").type().intType().noDefault() + .requiredLong("timestamp") + .requiredString("host_id") + .requiredBoolean("powered_on") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredDouble("total_work") + .requiredDouble("granted_work") + .requiredDouble("overcommitted_work") + .requiredDouble("interfered_work") + .requiredDouble("cpu_usage") + .requiredDouble("cpu_demand") + .requiredDouble("power_draw") + .requiredInt("num_instances") + .requiredInt("num_cpus") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..9904adde --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("state", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp + builder["server_id"] = data.server.uid.toString() + builder["state"] = data.server.state + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["num_vcpus"] = data.server.flavor.cpuCount + builder["mem_capacity"] = data.server.flavor.memorySize + } + + override fun toString(): String = "server-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .requiredLong("timestamp") + .requiredString("server_id") + .requiredString("state") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredInt("num_vcpus") + .requiredLong("mem_capacity") + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 36d630f3..e1428fe7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData import java.io.File @@ -32,34 +32,34 @@ import java.io.File * A Parquet event writer for [ServiceData]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, schema, convert, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { - override fun toString(): String = "service-writer" + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp + builder["host_total_count"] = data.hostCount + builder["host_available_count"] = data.activeHostCount + builder["instance_total_count"] = data.instanceCount + builder["instance_active_count"] = data.runningInstanceCount + builder["instance_inactive_count"] = data.finishedInstanceCount + builder["instance_waiting_count"] = data.queuedInstanceCount + builder["instance_failed_count"] = data.failedInstanceCount + } - public companion object { - private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> - record.put("timestamp", data.timestamp) - record.put("host_total_count", data.hostCount) - record.put("host_available_count", data.activeHostCount) - record.put("instance_total_count", data.instanceCount) - record.put("instance_active_count", data.runningInstanceCount) - record.put("instance_inactive_count", data.finishedInstanceCount) - record.put("instance_waiting_count", data.queuedInstanceCount) - record.put("instance_failed_count", data.failedInstanceCount) - } + override fun toString(): String = "service-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("instance_total_count").type().intType().noDefault() - .name("instance_active_count").type().intType().noDefault() - .name("instance_inactive_count").type().intType().noDefault() - .name("instance_waiting_count").type().intType().noDefault() - .name("instance_failed_count").type().intType().noDefault() + .requiredLong("timestamp") + .requiredInt("host_total_count") + .requiredInt("host_available_count") + .requiredInt("instance_total_count") + .requiredInt("instance_active_count") + .requiredInt("instance_inactive_count") + .requiredInt("instance_waiting_count") + .requiredInt("instance_failed_count") .endRecord() } } -- cgit v1.2.3 From 3ca64e0110adab65526a0ccfd5b252e9f047ab10 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 14:41:05 +0200 Subject: refactor(telemetry): Create separate MeterProvider per service/host This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario. --- .../experiments/capelin/ExperimentHelpers.kt | 256 --------------------- .../org/opendc/experiments/capelin/Portfolio.kt | 67 +++--- .../export/parquet/ParquetHostDataWriter.kt | 7 +- .../export/parquet/ParquetServerDataWriter.kt | 8 +- .../experiments/capelin/util/ComputeSchedulers.kt | 86 +++++++ .../capelin/util/ComputeServiceSimulator.kt | 222 ++++++++++++++++++ .../experiments/capelin/util/FailureModel.kt | 38 +++ .../experiments/capelin/util/FailureModels.kt | 97 ++++++++ .../experiments/capelin/CapelinIntegrationTest.kt | 190 ++++++++------- .../experiments/serverless/ServerlessExperiment.kt | 3 +- 10 files changed, 574 insertions(+), 400 deletions(-) delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..f7f9336e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,43 +122,36 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() - monitor.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { "Finish " + "SUBMIT=${monitorResults.instanceCount} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 7062a275..fa00fc35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -28,7 +28,6 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -46,8 +45,8 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: HostData) { builder["timestamp"] = data.timestamp - builder["host_id"] = data.host.name - builder["powered_on"] = data.host.state == HostState.UP + builder["host_id"] = data.host.id + builder["powered_on"] = true builder["uptime"] = data.uptime builder["downtime"] = data.downtime builder["total_work"] = data.totalWork @@ -58,7 +57,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["cpu_demand"] = data.cpuDemand builder["power_draw"] = data.powerDraw builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.model.cpuCount + builder["num_cpus"] = data.host.cpuCount } override fun toString(): String = "host-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index 9904adde..bb2db4b7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -46,12 +46,12 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServerData) { builder["timestamp"] = data.timestamp - builder["server_id"] = data.server.uid.toString() - builder["state"] = data.server.state + builder["server_id"] = data.server + // builder["state"] = data.server.state builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["num_vcpus"] = data.server.flavor.cpuCount - builder["mem_capacity"] = data.server.flavor.memorySize + // builder["num_vcpus"] = data.server.flavor.cpuCount + // builder["mem_capacity"] = data.server.flavor.memorySize } override fun toString(): String = "server-writer" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List + get() = _metricProducers + private val _metricProducers = mutableListOf() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index cf88535d..f4cf3e5e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* +import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -59,12 +61,21 @@ class CapelinIntegrationTest { */ private lateinit var monitor: TestExperimentReporter + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,26 +83,26 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -106,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, ) } @@ -120,27 +131,26 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -151,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -164,10 +174,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,20 +183,24 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -201,10 +211,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, + { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -214,39 +224,27 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( "Finish " + "SUBMIT=${serviceMetrics.instanceCount} " + @@ -257,9 +255,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -291,10 +289,10 @@ class CapelinIntegrationTest { var totalPowerDraw = 0.0 override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() + this.totalWork += data.totalWork.roundToLong() + totalGrantedWork += data.grantedWork.roundToLong() + totalOvercommittedWork += data.overcommittedWork.roundToLong() + totalInterferedWork += data.interferedWork.roundToLong() totalPowerDraw += data.powerDraw } } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 650416f5..3312d6c0 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.time.Duration import java.util.* import kotlin.math.max @@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") { val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000)) + FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) val client = service.newClient() coroutineScope { -- cgit v1.2.3 From 8d899e29dbd757f6df320212d6e0d77ce8216ab9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 15:38:38 +0200 Subject: refactor(telemetry): Standardize compute scheduler metrics This change updates the OpenDC compute service implementation with multiple meters that follow the OpenTelemetry conventions. --- .../org/opendc/experiments/capelin/Portfolio.kt | 11 ++--- .../export/parquet/ParquetServiceDataWriter.kt | 28 ++++++------ .../experiments/capelin/CapelinIntegrationTest.kt | 52 ++++++++++++---------- 3 files changed, 48 insertions(+), 43 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f7f9336e..3ec424f1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -153,11 +153,12 @@ abstract class Portfolio(name: String) : Experiment(name) { val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.activeHostCount}" + "Scheduler " + + "Success=${monitorResults.attemptsSuccess} " + + "Failure=${monitorResults.attemptsFailure} " + + "Error=${monitorResults.attemptsError} " + + "Pending=${monitorResults.serversPending} " + + "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index e1428fe7..29b48878 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -36,13 +36,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : override fun convert(builder: GenericRecordBuilder, data: ServiceData) { builder["timestamp"] = data.timestamp - builder["host_total_count"] = data.hostCount - builder["host_available_count"] = data.activeHostCount - builder["instance_total_count"] = data.instanceCount - builder["instance_active_count"] = data.runningInstanceCount - builder["instance_inactive_count"] = data.finishedInstanceCount - builder["instance_waiting_count"] = data.queuedInstanceCount - builder["instance_failed_count"] = data.failedInstanceCount + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError } override fun toString(): String = "service-writer" @@ -53,13 +53,13 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .namespace("org.opendc.telemetry.compute") .fields() .requiredLong("timestamp") - .requiredInt("host_total_count") - .requiredInt("host_available_count") - .requiredInt("instance_total_count") - .requiredInt("instance_active_count") - .requiredInt("instance_inactive_count") - .requiredInt("instance_waiting_count") - .requiredInt("instance_failed_count") + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index f4cf3e5e..81405acf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -104,19 +104,20 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -152,11 +153,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -202,11 +204,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand @@ -246,11 +249,12 @@ class CapelinIntegrationTest { val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand -- cgit v1.2.3 From 0d8bccc68705d036fbf60f312d9c34ca4392c6b2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 7 Sep 2021 17:30:46 +0200 Subject: refactor(telemetry): Standardize SimHost metrics This change standardizes the metrics emitted by SimHost instances and their guests based on the OpenTelemetry semantic conventions. We now also report CPU time as opposed to CPU work as this metric is more commonly used. --- .../org/opendc/experiments/capelin/Portfolio.kt | 3 +- .../capelin/export/parquet/ParquetDataWriter.kt | 1 - .../export/parquet/ParquetHostDataWriter.kt | 56 +++++++++++------- .../export/parquet/ParquetServerDataWriter.kt | 38 ++++++++++--- .../export/parquet/ParquetServiceDataWriter.kt | 2 +- .../experiments/capelin/CapelinIntegrationTest.kt | 66 +++++++++++----------- 6 files changed, 101 insertions(+), 65 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 3ec424f1..6261ebbf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -149,9 +149,10 @@ abstract class Portfolio(name: String) : Experiment(name) { } finally { simulator.close() metricReader.close() + monitor.close() } - val monitorResults = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index 5684bde9..e3d15c3b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -27,7 +27,6 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.example.Paper.schema import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index fa00fc35..36207045 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -44,20 +44,31 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : } override fun convert(builder: GenericRecordBuilder, data: HostData) { - builder["timestamp"] = data.timestamp + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["host_id"] = data.host.id - builder["powered_on"] = true + builder["num_cpus"] = data.host.cpuCount + builder["mem_capacity"] = data.host.memCapacity + builder["uptime"] = data.uptime builder["downtime"] = data.downtime - builder["total_work"] = data.totalWork - builder["granted_work"] = data.grantedWork - builder["overcommitted_work"] = data.overcommittedWork - builder["interfered_work"] = data.interferedWork - builder["cpu_usage"] = data.cpuUsage - builder["cpu_demand"] = data.cpuDemand - builder["power_draw"] = data.powerDraw - builder["num_instances"] = data.instanceCount - builder["num_cpus"] = data.host.cpuCount + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid } override fun toString(): String = "host-writer" @@ -69,18 +80,21 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .fields() .requiredLong("timestamp") .requiredString("host_id") - .requiredBoolean("powered_on") + .requiredInt("num_cpus") + .requiredLong("mem_capacity") .requiredLong("uptime") .requiredLong("downtime") - .requiredDouble("total_work") - .requiredDouble("granted_work") - .requiredDouble("overcommitted_work") - .requiredDouble("interfered_work") - .requiredDouble("cpu_usage") - .requiredDouble("cpu_demand") - .requiredDouble("power_draw") - .requiredInt("num_instances") - .requiredInt("num_cpus") + .optionalLong("boot_time") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index bb2db4b7..c5a5e7c0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -40,18 +40,31 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { return builder .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("state", true) + .withDictionaryEncoding("host_id", true) .build() } override fun convert(builder: GenericRecordBuilder, data: ServerData) { - builder["timestamp"] = data.timestamp - builder["server_id"] = data.server - // builder["state"] = data.server.state + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + builder["num_vcpus"] = data.server.cpuCount + builder["mem_capacity"] = data.server.memCapacity + builder["uptime"] = data.uptime builder["downtime"] = data.downtime - // builder["num_vcpus"] = data.server.flavor.cpuCount - // builder["mem_capacity"] = data.server.flavor.memorySize + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime } override fun toString(): String = "server-writer" @@ -63,11 +76,18 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .fields() .requiredLong("timestamp") .requiredString("server_id") - .requiredString("state") - .requiredLong("uptime") - .requiredLong("downtime") + .optionalString("host_id") .requiredInt("num_vcpus") .requiredLong("mem_capacity") + .requiredLong("uptime") + .requiredLong("downtime") + .optionalLong("boot_time") + .requiredLong("scheduling_latency") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 29b48878..d9ca55cb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -35,7 +35,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : ParquetDataWriter(path, SCHEMA, bufferSize) { override fun convert(builder: GenericRecordBuilder, data: ServiceData) { - builder["timestamp"] = data.timestamp + builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown builder["servers_pending"] = data.serversPending diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 81405acf..727530e3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -50,7 +50,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration import java.util.* -import kotlin.math.roundToLong /** * An integration test suite for the Capelin experiments. @@ -102,7 +101,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -118,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -151,7 +150,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -163,10 +162,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, - { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -202,7 +201,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -214,10 +213,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } }, - { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -247,7 +246,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -259,10 +258,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } ) } @@ -286,18 +286,20 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ComputeMonitor { - var totalWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - var totalInterferedWork = 0L - var totalPowerDraw = 0.0 + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L override fun record(data: HostData) { - this.totalWork += data.totalWork.roundToLong() - totalGrantedWork += data.grantedWork.roundToLong() - totalOvercommittedWork += data.overcommittedWork.roundToLong() - totalInterferedWork += data.interferedWork.roundToLong() - totalPowerDraw += data.powerDraw + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime } } } -- cgit v1.2.3 From e2537c59bef0645b948e92553cc5a42a8c0f7256 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Sep 2021 21:34:00 +0200 Subject: feat(capelin): Use logical types for Parquet output columns This change updates the output schema for the experiment data to use logical types where possible. This adds additional context for the writer and the reader on how to process the column (efficiently). --- .../capelin/export/parquet/AvroUtils.kt | 44 ++++++++++++++++++++++ .../export/parquet/ParquetHostDataWriter.kt | 15 ++++---- .../export/parquet/ParquetServerDataWriter.kt | 18 +++++---- .../export/parquet/ParquetServiceDataWriter.kt | 2 +- 4 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt new file mode 100644 index 00000000..a4676f31 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +internal fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 36207045..58388cb1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -47,8 +47,6 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id - builder["num_cpus"] = data.host.cpuCount - builder["mem_capacity"] = data.host.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -57,12 +55,15 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["boot_time"] = bootTime.toEpochMilli() } + builder["cpu_count"] = data.host.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + builder["mem_limit"] = data.host.memCapacity + builder["power_total"] = data.powerTotal builder["guests_terminated"] = data.guestsTerminated @@ -78,18 +79,18 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("host_id") - .requiredInt("num_cpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .requiredDouble("power_total") .requiredInt("guests_terminated") .requiredInt("guests_running") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index c5a5e7c0..43b5f469 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -30,6 +30,7 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerData import java.io.File +import java.util.* /** * A Parquet event writer for [ServerData]s. @@ -49,8 +50,6 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["server_id"] = data.server.id builder["host_id"] = data.host?.id - builder["num_vcpus"] = data.server.cpuCount - builder["mem_capacity"] = data.server.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -60,11 +59,14 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : } builder["scheduling_latency"] = data.schedulingLatency + builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity } override fun toString(): String = "server-writer" @@ -74,20 +76,20 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .record("server") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("server_id") - .optionalString("host_id") - .requiredInt("num_vcpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .requiredLong("scheduling_latency") + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index d9ca55cb..2928f445 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -52,7 +52,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() .requiredInt("hosts_up") .requiredInt("hosts_down") .requiredInt("servers_pending") -- cgit v1.2.3