diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-17 17:12:20 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-17 17:12:20 +0200 |
| commit | c1b9719aad10566c9d17f9eb757236c58a602b89 (patch) | |
| tree | 2755ea2d44256116e6dc08a57a64b37a36331249 /opendc-experiments/opendc-experiments-capelin/src | |
| parent | 2cd3bd18e548a72d64afe0e7f59487f4747d722f (diff) | |
| parent | e2537c59bef0645b948e92553cc5a42a8c0f7256 (diff) | |
merge: Standardize simulator metrics
This pull request standardizes the metrics emitted by the simulator based on OpenTelemetry conventions.
From now on, all metrics exposed by the simulator are exported through OpenTelemetry
following the recommended practices for naming, collection, etc.
**Implementation Notes**
- Improve ParquetDataWriter implementation
- Simplify CoroutineMetricReader
- Create separate MeterProvider per service/host
- Standardize compute scheduler metrics
- Standardize SimHost metrics
- Use logical types for Parquet output columns
**External Dependencies**
- Update to OpenTelemetry 1.6.0
**Breaking API Changes**
- Instead of supplying a `Meter` instances, key classes are now responsible for constructing
a `Meter` instance from the supplied `MeterProvider`.
- Export format has been changed to suit the outputted metrics
- Energy experiments shell has been removed
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
13 files changed, 913 insertions, 534 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader<SimWorkload>, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..6261ebbf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,49 +122,44 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() monitor.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.activeHostCount}" + "Scheduler " + + "Success=${monitorResults.attemptsSuccess} " + + "Failure=${monitorResults.attemptsFailure} " + + "Error=${monitorResults.attemptsError} " + + "Pending=${monitorResults.serversPending} " + + "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt new file mode 100644 index 00000000..a4676f31 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +internal fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index c5cb80e2..e3d15c3b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -25,11 +25,12 @@ package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,50 +39,94 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -public open class ParquetDataWriter<in T>( - private val path: File, +abstract class ParquetDataWriter<in T>( + path: File, private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { + bufferSize: Int = 4096 +) : AutoCloseable { /** * The logging instance to use. */ private val logger = KotlinLogging.logger {} /** - * The writer to write the Parquet file. + * The queue of commands to process. */ - private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) /** - * The queue of commands to process. + * An exception to be propagated to the actual writer. */ - private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + private var exception: Throwable? = null /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { run() } + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) /** * Write the specified metrics to the database. */ - public fun write(event: T) { - queue.put(Action.Write(event)) + fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) } /** * Signal the writer to stop. */ - public override fun close() { - queue.put(Action.Stop) + override fun close() { + writerThread.interrupt() writerThread.join() } @@ -90,38 +135,11 @@ public open class ParquetDataWriter<in T>( } /** - * Start the writer thread. + * Process the specified [data] to be written to the Parquet file. */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.data as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write<out T>(val data: T) : Action() + private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 79b84e9d..b057e932 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val hostWriter = ParquetHostDataWriter( File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + private val serviceWriter = ParquetServiceDataWriter( File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + override fun record(data: ServerData) { + serverWriter.write(data) + } + override fun record(data: HostData) { hostWriter.write(data) } @@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int override fun close() { hostWriter.close() serviceWriter.close() + serverWriter.close() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 8912c12e..58388cb1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,9 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -32,42 +35,67 @@ import java.io.File * A Parquet event writer for [HostData]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, schema, convert, bufferSize) { + ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "host-writer" + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp.toEpochMilli() - public companion object { - private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> - record.put("host_id", data.host.name) - record.put("state", data.host.state.name) - record.put("timestamp", data.timestamp) - record.put("total_work", data.totalWork) - record.put("granted_work", data.grantedWork) - record.put("overcommitted_work", data.overcommittedWork) - record.put("interfered_work", data.interferedWork) - record.put("cpu_usage", data.cpuUsage) - record.put("cpu_demand", data.cpuDemand) - record.put("power_draw", data.powerDraw) - record.put("instance_count", data.instanceCount) - record.put("cores", data.host.model.cpuCount) + builder["host_id"] = data.host.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() } - private val schema: Schema = SchemaBuilder + builder["cpu_count"] = data.host.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.host.memCapacity + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid + } + + override fun toString(): String = "host-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("requested_work").type().longType().noDefault() - .name("granted_work").type().longType().noDefault() - .name("overcommitted_work").type().longType().noDefault() - .name("interfered_work").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("instance_count").type().intType().noDefault() - .name("cores").type().intType().noDefault() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..43b5f469 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import java.io.File +import java.util.* + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_count"] = data.server.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity + } + + override fun toString(): String = "server-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredLong("scheduling_latency") + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 36d630f3..2928f445 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData import java.io.File @@ -32,34 +32,34 @@ import java.io.File * A Parquet event writer for [ServiceData]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) { + ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "service-writer" + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError + } - public companion object { - private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> - record.put("timestamp", data.timestamp) - record.put("host_total_count", data.hostCount) - record.put("host_available_count", data.activeHostCount) - record.put("instance_total_count", data.instanceCount) - record.put("instance_active_count", data.runningInstanceCount) - record.put("instance_inactive_count", data.finishedInstanceCount) - record.put("instance_waiting_count", data.queuedInstanceCount) - record.put("instance_failed_count", data.failedInstanceCount) - } + override fun toString(): String = "service-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("instance_total_count").type().intType().noDefault() - .name("instance_active_count").type().intType().noDefault() - .name("instance_inactive_count").type().intType().noDefault() - .name("instance_waiting_count").type().intType().noDefault() - .name("instance_failed_count").type().intType().noDefault() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List<MachineDef>, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader<SimWorkload>) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair<SimHost, SdkMeterProvider> { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 44cf92a8..727530e3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* /** @@ -60,11 +61,20 @@ class CapelinIntegrationTest { private lateinit var monitor: TestExperimentReporter /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,45 +82,46 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -120,41 +131,41 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -164,10 +175,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,34 +184,39 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -214,53 +226,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } ) } @@ -284,18 +286,20 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ComputeMonitor { - var totalWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - var totalInterferedWork = 0L - var totalPowerDraw = 0.0 + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() - totalPowerDraw += data.powerDraw + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime } } } |
