diff options
Diffstat (limited to 'opendc-experiments')
17 files changed, 889 insertions, 781 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 8227bca9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import kotlinx.coroutines.* -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.ReplayScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher -import org.opendc.compute.service.scheduler.weights.RamWeigher -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - interferenceModel: VmInterferenceModel? = null, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace( - clock: Clock, - reader: TraceReader<SimWorkload>, - scheduler: ComputeService, - monitor: ComputeMonitor? = null, -) { - val client = scheduler.newClient() - val watcher = object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) - } - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - server.watch(watcher) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} - -/** - * Create a [MeterProvider] instance for the experiment. - */ -fun createMeterProvider(clock: Clock): MeterProvider { - return SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() -} - -/** - * Create a [ComputeScheduler] for the experiment. - */ -fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { - val cpuAllocationRatio = 16.0 - val ramAllocationRatio = 1.5 - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = 1.0)) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(RamWeigher(multiplier = -1.0)) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(CoreRamWeigher(multiplier = -1.0)) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), - weighers = emptyList(), - subsetSize = Int.MAX_VALUE, - random = java.util.Random(seeder.nextLong()) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 82794471..6261ebbf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,10 +23,7 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import mu.KotlinLogging -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -36,17 +33,21 @@ import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator +import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.io.FileInputStream +import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom +import kotlin.math.roundToLong /** * A portfolio represents a collection of scenarios are tested for the work. @@ -97,28 +98,23 @@ abstract class Portfolio(name: String) : Experiment(name) { /** * Perform a single trial for this portfolio. */ - @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) - - val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } } else { listOf(workload.name) } - val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(FileInputStream(config.getString("interference-model"))) @@ -126,49 +122,44 @@ abstract class Portfolio(name: String) : Experiment(name) { else null - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) + val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + else + null + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environment.read(), + failureModel, + performanceInterferenceModel + ) val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) - withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val faultInjector = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seeder.nextInt(), - operationalPhenomena.failureFrequency, - ) - } else { - null - } - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector?.start() - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - - faultInjector?.close() + try { + simulator.run(trace) + } finally { + simulator.close() + metricReader.close() monitor.close() } - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.instant(), simulator.producers[0]) logger.debug { - "Finish " + - "SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.activeHostCount}" + "Scheduler " + + "Success=${monitorResults.attemptsSuccess} " + + "Failure=${monitorResults.attemptsFailure} " + + "Error=${monitorResults.attemptsError} " + + "Pending=${monitorResults.serversPending} " + + "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt index cc58e5f1..a4676f31 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt @@ -20,28 +20,25 @@ * SOFTWARE. */ -description = "Experiments for the OpenDC Energy work" +@file:JvmName("AvroUtils") +package org.opendc.experiments.capelin.export.parquet -/* Build configuration */ -plugins { - `experiment-conventions` - `testing-conventions` -} +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema -dependencies { - api(platform(projects.opendcPlatform)) - api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcExperiments.opendcExperimentsCapelin) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.kotlin.logging) - implementation(libs.config) +/** + * Schema for UUID type. + */ +internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - implementation(libs.parquet) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } +/** + * Schema for timestamp type. + */ +internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +internal fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index c5cb80e2..e3d15c3b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -25,11 +25,12 @@ package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,50 +39,94 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -public open class ParquetDataWriter<in T>( - private val path: File, +abstract class ParquetDataWriter<in T>( + path: File, private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { + bufferSize: Int = 4096 +) : AutoCloseable { /** * The logging instance to use. */ private val logger = KotlinLogging.logger {} /** - * The writer to write the Parquet file. + * The queue of commands to process. */ - private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) /** - * The queue of commands to process. + * An exception to be propagated to the actual writer. */ - private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + private var exception: Throwable? = null /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { run() } + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) /** * Write the specified metrics to the database. */ - public fun write(event: T) { - queue.put(Action.Write(event)) + fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) } /** * Signal the writer to stop. */ - public override fun close() { - queue.put(Action.Stop) + override fun close() { + writerThread.interrupt() writerThread.join() } @@ -90,38 +135,11 @@ public open class ParquetDataWriter<in T>( } /** - * Start the writer thread. + * Process the specified [data] to be written to the Parquet file. */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.data as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write<out T>(val data: T) : Action() + private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 79b84e9d..b057e932 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val hostWriter = ParquetHostDataWriter( File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + private val serviceWriter = ParquetServiceDataWriter( File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + override fun record(data: ServerData) { + serverWriter.write(data) + } + override fun record(data: HostData) { hostWriter.write(data) } @@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int override fun close() { hostWriter.close() serviceWriter.close() + serverWriter.close() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 8912c12e..58388cb1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,9 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -32,42 +35,67 @@ import java.io.File * A Parquet event writer for [HostData]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, schema, convert, bufferSize) { + ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "host-writer" + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp.toEpochMilli() - public companion object { - private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> - record.put("host_id", data.host.name) - record.put("state", data.host.state.name) - record.put("timestamp", data.timestamp) - record.put("total_work", data.totalWork) - record.put("granted_work", data.grantedWork) - record.put("overcommitted_work", data.overcommittedWork) - record.put("interfered_work", data.interferedWork) - record.put("cpu_usage", data.cpuUsage) - record.put("cpu_demand", data.cpuDemand) - record.put("power_draw", data.powerDraw) - record.put("instance_count", data.instanceCount) - record.put("cores", data.host.model.cpuCount) + builder["host_id"] = data.host.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() } - private val schema: Schema = SchemaBuilder + builder["cpu_count"] = data.host.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.host.memCapacity + + builder["power_total"] = data.powerTotal + + builder["guests_terminated"] = data.guestsTerminated + builder["guests_running"] = data.guestsRunning + builder["guests_error"] = data.guestsError + builder["guests_invalid"] = data.guestsInvalid + } + + override fun toString(): String = "host-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("requested_work").type().longType().noDefault() - .name("granted_work").type().longType().noDefault() - .name("overcommitted_work").type().longType().noDefault() - .name("interfered_work").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("instance_count").type().intType().noDefault() - .name("cores").type().intType().noDefault() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .requiredDouble("power_total") + .requiredInt("guests_terminated") + .requiredInt("guests_running") + .requiredInt("guests_error") + .requiredInt("guests_invalid") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..43b5f469 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import java.io.File +import java.util.* + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("host_id", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + + builder["server_id"] = data.server.id + builder["host_id"] = data.host?.id + + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + val bootTime = data.bootTime + if (bootTime != null) { + builder["boot_time"] = bootTime.toEpochMilli() + } + builder["scheduling_latency"] = data.schedulingLatency + + builder["cpu_count"] = data.server.cpuCount + builder["cpu_limit"] = data.cpuLimit + builder["cpu_time_active"] = data.cpuActiveTime + builder["cpu_time_idle"] = data.cpuIdleTime + builder["cpu_time_steal"] = data.cpuStealTime + builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity + } + + override fun toString(): String = "server-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() + .requiredLong("uptime") + .requiredLong("downtime") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredLong("scheduling_latency") + .requiredInt("cpu_count") + .requiredDouble("cpu_limit") + .requiredLong("cpu_time_active") + .requiredLong("cpu_time_idle") + .requiredLong("cpu_time_steal") + .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 36d630f3..2928f445 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData import java.io.File @@ -32,34 +32,34 @@ import java.io.File * A Parquet event writer for [ServiceData]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) { + ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "service-writer" + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp.toEpochMilli() + builder["hosts_up"] = data.hostsUp + builder["hosts_down"] = data.hostsDown + builder["servers_pending"] = data.serversPending + builder["servers_active"] = data.serversActive + builder["attempts_success"] = data.attemptsSuccess + builder["attempts_failure"] = data.attemptsFailure + builder["attempts_error"] = data.attemptsError + } - public companion object { - private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> - record.put("timestamp", data.timestamp) - record.put("host_total_count", data.hostCount) - record.put("host_available_count", data.activeHostCount) - record.put("instance_total_count", data.instanceCount) - record.put("instance_active_count", data.runningInstanceCount) - record.put("instance_inactive_count", data.finishedInstanceCount) - record.put("instance_waiting_count", data.queuedInstanceCount) - record.put("instance_failed_count", data.failedInstanceCount) - } + override fun toString(): String = "service-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("instance_total_count").type().intType().noDefault() - .name("instance_active_count").type().intType().noDefault() - .name("instance_inactive_count").type().intType().noDefault() - .name("instance_waiting_count").type().intType().noDefault() - .name("instance_failed_count").type().intType().noDefault() + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("hosts_up") + .requiredInt("hosts_down") + .requiredInt("servers_pending") + .requiredInt("servers_active") + .requiredInt("attempts_success") + .requiredInt("attempts_failure") + .requiredInt("attempts_error") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt new file mode 100644 index 00000000..3b7c3f0f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeSchedulers.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeSchedulers") +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import java.util.* + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt new file mode 100644 index 00000000..065a8c93 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.MachineDef +import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.SimHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.max + +/** + * Helper class to manage a [ComputeService] simulation. + */ +class ComputeServiceSimulator( + private val context: CoroutineContext, + private val clock: Clock, + scheduler: ComputeScheduler, + machines: List<MachineDef>, + private val failureModel: FailureModel? = null, + interferenceModel: VmInterferenceModel? = null, + hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() +) : AutoCloseable { + /** + * The [ComputeService] that has been configured by the manager. + */ + val service: ComputeService + + /** + * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. + */ + val producers: List<MetricProducer> + get() = _metricProducers + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The [SimResourceInterpreter] to simulate the hosts. + */ + private val interpreter = SimResourceInterpreter(context, clock) + + /** + * The hosts that belong to this class. + */ + private val hosts = mutableSetOf<SimHost>() + + init { + val (service, serviceMeterProvider) = createService(scheduler) + this._metricProducers.add(serviceMeterProvider) + this.service = service + + for (def in machines) { + val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) + this._metricProducers.add(hostMeterProvider) + hosts.add(host) + this.service.addHost(host) + } + } + + /** + * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + */ + suspend fun run(reader: TraceReader<SimWorkload>) { + val injector = failureModel?.createInjector(context, clock, service) + val client = service.newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } + delay(max(0, (entry.start - offset) - clock.millis())) + + launch { + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + mapOf("workload" to workload) + ) + + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) + + // Delete the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + + yield() + } finally { + injector?.close() + reader.close() + client.close() + } + } + + override fun close() { + service.close() + + for (host in hosts) { + host.close() + } + + hosts.clear() + } + + /** + * Construct a [ComputeService] instance. + */ + private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val service = ComputeService(context, clock, meterProvider, scheduler) + return service to meterProvider + } + + /** + * Construct a [SimHost] instance for the specified [MachineDef]. + */ + private fun createHost( + def: MachineDef, + hypervisorProvider: SimHypervisorProvider, + interferenceModel: VmInterferenceModel? = null + ): Pair<SimHost, SdkMeterProvider> { + val resource = Resource.builder() + .put(HOST_ID, def.uid.toString()) + .put(HOST_NAME, def.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, def.model.cpus.size) + .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + + val host = SimHost( + def.uid, + def.name, + def.model, + def.meta, + context, + interpreter, + meterProvider, + hypervisorProvider, + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() + ) + + return host to meterProvider + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt new file mode 100644 index 00000000..83393896 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.util + +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.failure.HostFaultInjector +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. + */ +interface FailureModel { + /** + * Construct a [HostFaultInjector] for the specified [service]. + */ + fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt new file mode 100644 index 00000000..89b4a31c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("FailureModels") +package org.opendc.experiments.capelin + +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c +import org.opendc.compute.service.ComputeService +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector +import org.opendc.experiments.capelin.util.FailureModel +import java.time.Clock +import java.time.Duration +import kotlin.coroutines.CoroutineContext +import kotlin.math.ln +import kotlin.random.Random + +/** + * Obtain a [FailureModel] based on the GRID'5000 failure trace. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun grid5000(failureInterval: Duration, seed: Int): FailureModel { + return object : FailureModel { + override fun createInjector( + context: CoroutineContext, + clock: Clock, + service: ComputeService + ): HostFaultInjector { + val rng = Well19937c(seed) + val hosts = service.hosts.map { it as SimHost }.toSet() + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) + } + + override fun toString(): String = "Grid5000FailureModel" + } +} + +/** + * Obtain the [HostFaultInjector] to use for the experiments. + * + * This fault injector uses parameters from the GRID'5000 failure trace as described in + * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. + */ +fun createFaultInjector( + context: CoroutineContext, + clock: Clock, + hosts: Set<SimHost>, + seed: Int, + failureInterval: Double +): HostFaultInjector { + val rng = Well19937c(seed) + + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return HostFaultInjector( + context, + clock, + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 44cf92a8..727530e3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* /** @@ -60,11 +61,20 @@ class CapelinIntegrationTest { private lateinit var monitor: TestExperimentReporter /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,45 +82,46 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -120,41 +131,41 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -164,10 +175,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,34 +184,39 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -214,53 +226,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } ) } @@ -284,18 +286,20 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ComputeMonitor { - var totalWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - var totalInterferedWork = 0L - var totalPowerDraw = 0.0 + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() - totalPowerDraw += data.powerDraw + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime } } } diff --git a/opendc-experiments/opendc-experiments-energy21/.gitignore b/opendc-experiments/opendc-experiments-energy21/.gitignore deleted file mode 100644 index 55da79f8..00000000 --- a/opendc-experiments/opendc-experiments-energy21/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -input/ -output/ -.ipynb_checkpoints diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt deleted file mode 100644 index d9194969..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.energy21 - -import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import mu.KotlinLogging -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor -import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.* -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.withMonitor -import java.io.File -import java.time.Clock -import java.util.* - -/** - * Experiments for the OpenDC project on Energy modeling. - */ -public class EnergyExperiment : Experiment("Energy Modeling 2021") { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The configuration to use. - */ - private val config = ConfigFactory.load().getConfig("opendc.experiments.energy21") - - /** - * The traces to test. - */ - private val trace by anyOf("solvinity") - - /** - * The power models to test. - */ - private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION) - - override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(), - subsetSize = Int.MAX_VALUE - ) - - val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) - - withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - trace, - scheduler, - monitor - ) - } - } - - val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) - logger.debug { - "Finish SUBMIT=${monitorResults.instanceCount} " + - "FAIL=${monitorResults.failedInstanceCount} " + - "QUEUE=${monitorResults.queuedInstanceCount} " + - "RUNNING=${monitorResults.runningInstanceCount}" - } - } - - /** - * Construct the environment for a simulated compute service.. - */ - public suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - scheduler: ComputeScheduler, - block: suspend CoroutineScope.(ComputeService) -> Unit - ): Unit = coroutineScope { - val model = createMachineModel() - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = List(64) { id -> - SimHost( - UUID(0, id.toLong()), - "node-$id", - model, - emptyMap(), - coroutineContext, - interpreter, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - PerformanceScalingGovernor(), - powerModel.driver - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } - } - - /** - * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html - */ - private fun createMachineModel(): MachineModel { - val node = ProcessingNode("AMD", "am64", "EPYC 7742", 64) - val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } - val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } - - return MachineModel(cpus, memory) - } - - /** - * The power models to test. - */ - public enum class PowerModelType { - CUBIC { - override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4)) - }, - - LINEAR { - override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4)) - }, - - SQRT { - override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4)) - }, - - SQUARE { - override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4)) - }, - - INTERPOLATION { - override val driver: PowerDriver = SimplePowerDriver( - InterpolationPowerModel( - listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0) - ) - ) - }, - - MSE { - override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4)) - }, - - ASYMPTOTIC { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false)) - }, - - ASYMPTOTIC_DVFS { - override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true)) - }; - - public abstract val driver: PowerDriver - } -} diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf b/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf deleted file mode 100644 index 263da0fe..00000000 --- a/opendc-experiments/opendc-experiments-energy21/src/main/resources/application.conf +++ /dev/null @@ -1,14 +0,0 @@ -# Default configuration for the energy experiments -opendc.experiments.energy21 { - # Path to the directory containing the input traces - trace-path = input/traces - - # Path to the output directory to write the results to - output-path = output -} - -opendc.experiments.capelin { - env-path = input/environments/ - trace-path = input/traces/ - output-path = output -} diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 650416f5..3312d6c0 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -46,6 +46,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.time.Duration import java.util.* import kotlin.math.max @@ -85,7 +86,7 @@ public class ServerlessExperiment : Experiment("Serverless") { val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10L * 60 * 1000)) + FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) val client = service.newClient() coroutineScope { |
