diff options
Diffstat (limited to 'opendc-experiments')
43 files changed, 41 insertions, 3286 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 7dadd14d..010d18b0 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,6 +31,8 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) + api(projects.opendcCompute.opendcComputeWorkload) + implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) @@ -38,16 +40,13 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.opentelemetry.semconv) - implementation(libs.kotlin.logging) implementation(libs.config) - implementation(libs.progressbar) - implementation(libs.clikt) + implementation(libs.kotlin.logging) + implementation(libs.jackson.databind) implementation(libs.jackson.module.kotlin) - implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) + implementation(libs.opentelemetry.semconv) - implementation(libs.parquet) testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6261ebbf..06db5569 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,16 +24,17 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.env.ClusterEnvironmentReader -import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -43,7 +44,6 @@ import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File -import java.io.FileInputStream import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -100,7 +100,12 @@ abstract class Portfolio(name: String) : Experiment(name) { */ override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader( + File( + config.getString("env-path"), + "${topology.name}.txt" + ) + ) val workload = workload val workloadNames = if (workload is CompositeWorkload) { @@ -117,7 +122,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() - .read(FileInputStream(config.getString("interference-model"))) + .read(File(config.getString("interference-model"))) .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -128,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) { grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) else null - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index babd8ada..8d9b24f4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin.env +import org.opendc.compute.workload.env.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt index a968b043..8d61c530 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin.env +import org.opendc.compute.workload.env.MachineDef import java.io.Closeable /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt deleted file mode 100644 index b0c0318f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.env - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* - -/** - * A definition of a machine in a cluster. - */ -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map<String, Any>, - val model: MachineModel, - val powerModel: PowerModel -) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt deleted file mode 100644 index a4676f31..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -internal fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt deleted file mode 100644 index e3d15c3b..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * A writer that writes data in Parquet format. - */ -abstract class ParquetDataWriter<in T>( - path: File, - private val schema: Schema, - bufferSize: Int = 4096 -) : AutoCloseable { - /** - * The logging instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) - - /** - * An exception to be propagated to the actual writer. - */ - private var exception: Throwable? = null - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = this.toString()) { - val writer = let { - val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - buildWriter(builder) - } - - val queue = queue - val buf = mutableListOf<T>() - var shouldStop = false - - try { - while (!shouldStop) { - try { - process(writer, queue.take()) - } catch (e: InterruptedException) { - shouldStop = true - } - - if (queue.drainTo(buf) > 0) { - for (data in buf) { - process(writer, data) - } - buf.clear() - } - } - } catch (e: Throwable) { - logger.error(e) { "Failure in Parquet data writer" } - exception = e - } finally { - writer.close() - } - } - - /** - * Build the [ParquetWriter] used to write the Parquet files. - */ - protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { - return builder.build() - } - - /** - * Convert the specified [data] into a Parquet record. - */ - protected abstract fun convert(builder: GenericRecordBuilder, data: T) - - /** - * Write the specified metrics to the database. - */ - fun write(data: T) { - val exception = exception - if (exception != null) { - throw IllegalStateException("Writer thread failed", exception) - } - - queue.put(data) - } - - /** - * Signal the writer to stop. - */ - override fun close() { - writerThread.interrupt() - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt deleted file mode 100644 index b057e932..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(data: ServerData) { - serverWriter.write(data) - } - - override fun record(data: HostData) { - hostWriter.write(data) - } - - override fun record(data: ServiceData) { - serviceWriter.write(data) - } - - override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 58388cb1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData -import java.io.File - -/** - * A Parquet event writer for [HostData]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { - - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun convert(builder: GenericRecordBuilder, data: HostData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - - builder["host_id"] = data.host.id - - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } - - builder["cpu_count"] = data.host.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime - - builder["mem_limit"] = data.host.memCapacity - - builder["power_total"] = data.powerTotal - - builder["guests_terminated"] = data.guestsTerminated - builder["guests_running"] = data.guestsRunning - builder["guests_error"] = data.guestsError - builder["guests_invalid"] = data.guestsInvalid - } - - override fun toString(): String = "host-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("host") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .requiredDouble("power_total") - .requiredInt("guests_terminated") - .requiredInt("guests_running") - .requiredInt("guests_error") - .requiredInt("guests_invalid") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index 43b5f469..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData -import java.io.File -import java.util.* - -/** - * A Parquet event writer for [ServerData]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { - - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun convert(builder: GenericRecordBuilder, data: ServerData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - - builder["server_id"] = data.server.id - builder["host_id"] = data.host?.id - - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } - builder["scheduling_latency"] = data.schedulingLatency - - builder["cpu_count"] = data.server.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime - - builder["mem_limit"] = data.server.memCapacity - } - - override fun toString(): String = "server-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("server") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("server_id").type(UUID_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA.optional()).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredLong("scheduling_latency") - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index 2928f445..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.export.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData -import java.io.File - -/** - * A Parquet event writer for [ServiceData]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { - - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { - builder["timestamp"] = data.timestamp.toEpochMilli() - builder["hosts_up"] = data.hostsUp - builder["hosts_down"] = data.hostsDown - builder["servers_pending"] = data.serversPending - builder["servers_active"] = data.serversActive - builder["attempts_success"] = data.attemptsSuccess - builder["attempts_failure"] = data.attemptsFailure - builder["attempts_error"] = data.attemptsError - } - - override fun toString(): String = "service-writer" - - companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("service") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("hosts_up") - .requiredInt("hosts_down") - .requiredInt("servers_pending") - .requiredInt("servers_active") - .requiredInt("attempts_success") - .requiredInt("attempts_failure") - .requiredInt("attempts_error") - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 0bf4ada6..498636ba 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,6 +22,9 @@ package org.opendc.experiments.capelin.trace +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.trace.TraceEntry +import org.opendc.compute.workload.trace.TraceReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.simulator.compute.workload.SimWorkload @@ -51,7 +54,10 @@ public class ParquetTraceReader( this.zip(listOf(workload)) } } - .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) } + .flatMap { + sampleWorkload(it.first, workload, it.second, seed) + .sortedBy(TraceEntry<SimWorkload>::start) + } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt deleted file mode 100644 index 9549af42..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from the input. - */ - fun read(input: InputStream): List<VmInterferenceGroup> { - return input.use { mapper.readValue(input) } - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt deleted file mode 100644 index ca937328..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.experiments.capelin.trace.bp.BPTraceFormat -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.* -import java.io.File -import java.util.UUID - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -class RawParquetTraceReader(private val path: File) { - /** - * The [Trace] that represents this trace. - */ - private val trace = BPTraceFormat().open(path.toURI().toURL()) - - /** - * Read the fragments into memory. - */ - private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() - - return try { - while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) - val time = reader.get(RESOURCE_STATE_TIMESTAMP) - val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - - val fragment = SimTraceWorkload.Fragment( - time.toEpochMilli(), - duration.toMillis(), - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - - var counter = 0 - val entries = mutableListOf<TraceEntry<SimWorkload>>() - - return try { - while (reader.nextRow()) { - - val id = reader.get(RESOURCE_ID) - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) - entries.add( - TraceEntry( - uid, id, submissionTime.toEpochMilli(), workload, - mapOf( - "submit-time" to submissionTime.toEpochMilli(), - "end-time" to endTime.toEpochMilli(), - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory.toLong(), - "workload" to workload - ) - ) - ) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - reader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List<TraceEntry<SimWorkload>> - - init { - val fragments = parseFragments() - entries = parseMeta(fragments) - } - - /** - * Read the entries in the trace. - */ - fun read(): List<TraceEntry<SimWorkload>> = entries -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt deleted file mode 100644 index ed82217d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.util.parquet.LocalInputFile -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param selectedVms The list of VMs to read from the trace. - */ -class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { - private val logger = KotlinLogging.logger {} - - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - time, - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf<UUID>() - val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() - - val metaReader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = selectedVms.ifEmpty { entries.keys } - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info { "Processing VM $id" } - - val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry(uid, id, submissionTime, workload, meta) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt deleted file mode 100644 index 1f3878eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.* -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA -import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA -import org.opendc.experiments.capelin.trace.sv.SvTraceFormat -import org.opendc.trace.* -import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.File -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.math.roundToLong - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -fun main(args: Array<String>): Unit = TraceConverterCli().main(args) - -/** - * Represents the command for converting traces - */ -class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The logger instance for the converter. - */ - private val logger = KotlinLogging.logger {} - - /** - * The directory where the trace should be stored. - */ - private val output by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val input by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input format of the trace. - */ - private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to SvTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) - .required() - - /** - * The sampling options. - */ - private val samplingOptions by SamplingOptions().cooccurring() - - override fun run() { - val metaParquet = File(output, "meta.parquet") - val traceParquet = File(output, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val trace = format.open(input.toURI().toURL()) - - logger.info { "Building resources table" } - - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(BP_RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() - - val selectedVms = metaWriter.use { convertResources(trace, it) } - - logger.info { "Wrote ${selectedVms.size} rows" } - logger.info { "Building resource states table" } - - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(BP_RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enableDictionaryEncoding() - .enablePageWriteChecksum() - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .build() - - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } - logger.info { "Wrote $statesCount rows" } - } - - /** - * Convert the resources table for the trace. - */ - private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { - val random = samplingOptions?.let { Random(it.seed) } - val samplingFraction = samplingOptions?.fraction ?: 1.0 - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - val selectedVms = mutableSetOf<String>() - - while (hasNextRow) { - var id: String - var numCpus = Int.MIN_VALUE - var memCapacity = Double.MIN_VALUE - var memUsage = Double.MIN_VALUE - var startTime = Long.MAX_VALUE - var stopTime = Long.MIN_VALUE - - do { - id = reader.get(RESOURCE_STATE_ID) - - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - startTime = min(startTime, timestamp) - stopTime = max(stopTime, timestamp) - - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) - - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) - if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { - memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) - } - - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) - - // Sample only a fraction of the VMs - if (random != null && random.nextDouble() > samplingFraction) { - continue - } - - val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) - - builder["id"] = id - builder["submissionTime"] = startTime - builder["endTime"] = stopTime - builder["maxCores"] = numCpus - builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() - - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) - selectedVms.add(id) - } - - return selectedVms - } - - /** - * Convert the resource states table for the trace. - */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - var count = 0 - - while (hasNextRow) { - var lastTimestamp = Long.MIN_VALUE - - do { - val id = reader.get(RESOURCE_STATE_ID) - - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } - - val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) - builder["id"] = id - - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - } - - val duration = timestamp - lastTimestamp - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val flops = (cpuUsage * duration / 1000.0).roundToLong() - - builder["time"] = timestamp - builder["duration"] = duration - builder["cores"] = cores - builder["cpuUsage"] = cpuUsage - builder["flops"] = flops - - writer.write(builder.build()) - - lastTimestamp = timestamp - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) - - count++ - } - - return count - } - - /** - * Options for sampling the workload trace. - */ - private class SamplingOptions : OptionGroup() { - /** - * The fraction of VMs to sample - */ - val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") - .double() - .restrictTo(0.0001, 1.0) - .required() - - /** - * The seed for sampling the trace. - */ - val seed by option("--sampling-seed", help = "seed for sampling the workload") - .long() - .default(0) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt deleted file mode 100644 index 303a6a8c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import java.util.UUID - -/** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. - */ -public data class TraceEntry<out T>( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map<String, Any> -) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt deleted file mode 100644 index 08304edc..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -/** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. - */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index cb32ce88..b42951df 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,6 +23,7 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging +import org.opendc.compute.workload.trace.TraceEntry import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt deleted file mode 100644 index f98f4b2c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - } - - this.delegate = delegate - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } else { - null - } - } - - override fun toString(): String = "AzureCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - return AzureResourceStateTableReader(factory.createParser(path.toFile())) - } - - override fun toString(): String = "AzureResourceStateTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt deleted file mode 100644 index f80c0e82..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.opendc.trace.* -import java.time.Instant - -/** - * A [TableReader] for the Azure v1 VM resource state table. - */ -internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) - "vm id" -> id = parser.text - "avg cpu" -> cpuUsagePct = parser.doubleValue - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } - } - - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn<Int>): Int { - throw IllegalArgumentException("Invalid column") - } - - override fun getLong(column: TableColumn<Long>): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var timestamp: Instant? = null - private var cpuUsagePct = Double.NaN - - /** - * Reset the state. - */ - private fun reset() { - id = null - timestamp = null - cpuUsagePct = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) - .addColumn("vm id", CsvSchema.ColumnType.STRING) - .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt deleted file mode 100644 index c9d4f7eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * The resource [Table] for the Azure v1 VM traces. - */ -internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY - ) - - override fun newReader(): TableReader { - return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("No partition $partition") - } - - override fun toString(): String = "AzureResourceTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt deleted file mode 100644 index b712b854..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.core.JsonToken -import com.fasterxml.jackson.dataformat.csv.CsvParser -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import org.apache.parquet.example.Paper.schema -import org.opendc.trace.* -import java.time.Instant - -/** - * A [TableReader] for the Azure v1 VM resources table. - */ -internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { - init { - parser.schema = schema - } - - override fun nextRow(): Boolean { - reset() - - if (!nextStart()) { - return false - } - - while (true) { - val token = parser.nextValue() - - if (token == null || token == JsonToken.END_OBJECT) { - break - } - - when (parser.currentName) { - "vm id" -> id = parser.text - "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) - "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) - "vm virtual core count" -> cpuCores = parser.intValue - "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } - - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_START_TIME -> startTime - RESOURCE_STOP_TIME -> stopTime - RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_NCPUS -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn<Long>): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_MEM_CAPACITY -> memCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - parser.close() - } - - /** - * Advance the parser until the next object start. - */ - private fun nextStart(): Boolean { - var token = parser.nextValue() - - while (token != null && token != JsonToken.START_OBJECT) { - token = parser.nextValue() - } - - return token != null - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var startTime: Instant? = null - private var stopTime: Instant? = null - private var cpuCores = -1 - private var memCapacity = Double.NaN - - /** - * Reset the state. - */ - fun reset() { - id = null - startTime = null - stopTime = null - cpuCores = -1 - memCapacity = Double.NaN - } - - companion object { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("vm id", CsvSchema.ColumnType.NUMBER) - .addColumn("subscription id", CsvSchema.ColumnType.STRING) - .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) - .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("vm category", CsvSchema.ColumnType.NUMBER) - .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) - .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt deleted file mode 100644 index 24c60bab..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the Azure v1 VM traces. - */ -class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = name in tables - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> AzureResourceTable(factory, path) - TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) - else -> null - } - } - - override fun toString(): String = "AzureTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt deleted file mode 100644 index 744e43a0..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.azure - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import com.fasterxml.jackson.dataformat.csv.CsvParser -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the Azure v1 format. - */ -class AzureTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "azure-v1" - - /** - * The [CsvFactory] used to create the parser. - */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) - - /** - * Open the trace file. - */ - override fun open(url: URL): AzureTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return AzureTrace(factory, path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt deleted file mode 100644 index f051bf88..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the Bitbrains Parquet format. - */ -internal class BPResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, - RESOURCE_STATE_CPU_USAGE, - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) - return BPResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt deleted file mode 100644 index 0e7ee555..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant - -/** - * A [TableReader] implementation for the Bitbrains Parquet format. - */ -internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { - /** - * The current record. - */ - private var record: GenericRecord? = null - - override fun nextRow(): Boolean { - record = reader.read() - return record != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } - } - - override fun <T> get(column: TableColumn<T>): T { - val record = checkNotNull(record) { "Reader in invalid state" } - - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn<Int>): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn<Long>): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn<Double>): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "BPResourceStateTableReader" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt deleted file mode 100644 index 5b0f013f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] in the Bitbrains Parquet format. - */ -internal class BPResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY - ) - - override fun newReader(): TableReader { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) - return BPResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt deleted file mode 100644 index 4416aae8..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Instant - -/** - * A [TableReader] implementation for the Bitbrains Parquet format. - */ -internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { - /** - * The current record. - */ - private var record: GenericRecord? = null - - override fun nextRow(): Boolean { - record = reader.read() - return record != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } - - override fun <T> get(column: TableColumn<T>): T { - val record = checkNotNull(record) { "Reader in invalid state" } - - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(column: TableColumn<Int>): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_NCPUS -> record["maxCores"] as Int - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn<Long>): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn<Double>): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "BPResourceTableReader" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt deleted file mode 100644 index 486587b1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * A [Trace] in the Bitbrains Parquet format. - */ -public class BPTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> BPResourceTable(path) - TABLE_RESOURCE_STATES -> BPResourceStateTable(path) - else -> null - } - } - - override fun toString(): String = "BPTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt deleted file mode 100644 index 49d5b4c5..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the GWF trace format. - */ -public class BPTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "bitbrains-parquet" - - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): BPTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return BPTrace(path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt deleted file mode 100644 index 7dd8161d..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.bp - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder - -/** - * Schema for the resources table in the trace. - */ -val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("submissionTime") - .requiredLong("endTime") - .requiredInt("maxCores") - .requiredLong("requiredMemory") - .endRecord() - -/** - * Schema for the resource states table in the trace. - */ -val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("meta") - .namespace("org.opendc.trace.capelin") - .fields() - .requiredString("id") - .requiredLong("time") - .requiredLong("duration") - .requiredInt("cores") - .requiredDouble("cpuUsage") - .requiredLong("flops") - .endRecord() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt deleted file mode 100644 index 67140fe9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.nio.file.Files -import java.nio.file.Path -import java.util.stream.Collectors -import kotlin.io.path.bufferedReader -import kotlin.io.path.extension -import kotlin.io.path.nameWithoutExtension - -/** - * The resource state [Table] in the extended Bitbrains format. - */ -internal class SvResourceStateTable(path: Path) : Table { - /** - * The partitions that belong to the table. - */ - private val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() - - override val name: String = TABLE_RESOURCE_STATES - - override val isSynthetic: Boolean = false - - override val columns: List<TableColumn<*>> = listOf( - RESOURCE_STATE_ID, - RESOURCE_STATE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, - RESOURCE_STATE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_STATE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - ) - - override fun newReader(): TableReader { - val it = partitions.iterator() - - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - } - - this.delegate = delegate - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { - return if (it.hasNext()) { - val (_, path) = it.next() - val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) - } else { - null - } - } - - override fun toString(): String = "SvCompositeTableReader" - } - } - - override fun newReader(partition: String): TableReader { - val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } - val reader = path.bufferedReader() - return SvResourceStateTableReader(reader) - } - - override fun toString(): String = "SvResourceStateTable" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt deleted file mode 100644 index 6ea403fe..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.io.BufferedReader -import java.time.Instant - -/** - * A [TableReader] for the Bitbrains resource state table. - */ -internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - override fun nextRow(): Boolean { - reset() - - var line: String - var num = 0 - - while (true) { - line = reader.readLine() ?: return false - num++ - - if (line[0] == '#' || line.isBlank()) { - // Ignore empty lines or comments - continue - } - - break - } - - line = line.trim() - - val length = line.length - var col = 0 - var start: Int - var end = 0 - - while (end < length) { - // Trim all whitespace before the field - start = end - while (start < length && line[start].isWhitespace()) { - start++ - } - - end = line.indexOf(' ', start) - - if (end < 0) { - end = length - } - - val field = line.subSequence(start, end) as String - when (col++) { - COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> cpuUsage = field.toDouble() - COL_CPU_DEMAND -> cpuDemand = field.toDouble() - COL_DISK_READ -> diskRead = field.toDouble() - COL_DISK_WRITE -> diskWrite = field.toDouble() - COL_CLUSTER_ID -> cluster = field.trim() - COL_NCPUS -> cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() - COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() - COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() - } - } - - return true - } - - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } - } - - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_STATE_ID -> id - RESOURCE_STATE_CLUSTER_ID -> cluster - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) - RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) - RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) - RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) - RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) - RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) - else -> throw IllegalArgumentException("Invalid column") - } - - @Suppress("UNCHECKED_CAST") - return res as T - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - return when (column) { - RESOURCE_STATE_POWERED_ON -> poweredOn - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(column: TableColumn<Long>): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity - RESOURCE_STATE_CPU_USAGE -> cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity - RESOURCE_STATE_CPU_DEMAND -> cpuDemand - RESOURCE_STATE_MEM_CAPACITY -> memCapacity - RESOURCE_STATE_DISK_READ -> diskRead - RESOURCE_STATE_DISK_WRITE -> diskWrite - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun close() { - reader.close() - } - - /** - * State fields of the reader. - */ - private var id: String? = null - private var cluster: String? = null - private var timestamp: Instant? = null - private var cpuCores = -1 - private var cpuCapacity = Double.NaN - private var cpuUsage = Double.NaN - private var cpuDemand = Double.NaN - private var cpuReadyPct = Double.NaN - private var memCapacity = Double.NaN - private var diskRead = Double.NaN - private var diskWrite = Double.NaN - private var poweredOn: Boolean = false - - /** - * Reset the state of the reader. - */ - private fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } - - /** - * Default column indices for the extended Bitbrains format. - */ - private val COL_TIMESTAMP = 0 - private val COL_CPU_USAGE = 1 - private val COL_CPU_DEMAND = 2 - private val COL_DISK_READ = 4 - private val COL_DISK_WRITE = 6 - private val COL_CLUSTER_ID = 10 - private val COL_NCPUS = 12 - private val COL_CPU_READY_PCT = 13 - private val COL_POWERED_ON = 14 - private val COL_CPU_CAPACITY = 18 - private val COL_ID = 19 - private val COL_MEM_CAPACITY = 20 -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt deleted file mode 100644 index dbd63de5..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.* -import java.nio.file.Path - -/** - * [Trace] implementation for the extended Bitbrains format. - */ -public class SvTrace internal constructor(private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return SvResourceStateTable(path) - } - - override fun toString(): String = "SvTrace[$path]" -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt deleted file mode 100644 index 0cce8559..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace.sv - -import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists - -/** - * A format implementation for the extended Bitbrains trace format. - */ -public class SvTraceFormat : TraceFormat { - /** - * The name of this trace format. - */ - override val name: String = "sv" - - /** - * Open the trace file. - */ - override fun open(url: URL): SvTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SvTrace(path) - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt deleted file mode 100644 index 065a8c93..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/ComputeServiceSimulator.kt +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.util - -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.env.MachineDef -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock -import kotlin.coroutines.CoroutineContext -import kotlin.math.max - -/** - * Helper class to manage a [ComputeService] simulation. - */ -class ComputeServiceSimulator( - private val context: CoroutineContext, - private val clock: Clock, - scheduler: ComputeScheduler, - machines: List<MachineDef>, - private val failureModel: FailureModel? = null, - interferenceModel: VmInterferenceModel? = null, - hypervisorProvider: SimHypervisorProvider = SimFairShareHypervisorProvider() -) : AutoCloseable { - /** - * The [ComputeService] that has been configured by the manager. - */ - val service: ComputeService - - /** - * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. - */ - val producers: List<MetricProducer> - get() = _metricProducers - private val _metricProducers = mutableListOf<MetricProducer>() - - /** - * The [SimResourceInterpreter] to simulate the hosts. - */ - private val interpreter = SimResourceInterpreter(context, clock) - - /** - * The hosts that belong to this class. - */ - private val hosts = mutableSetOf<SimHost>() - - init { - val (service, serviceMeterProvider) = createService(scheduler) - this._metricProducers.add(serviceMeterProvider) - this.service = service - - for (def in machines) { - val (host, hostMeterProvider) = createHost(def, hypervisorProvider, interferenceModel) - this._metricProducers.add(hostMeterProvider) - hosts.add(host) - this.service.addHost(host) - } - } - - /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. - */ - suspend fun run(reader: TraceReader<SimWorkload>) { - val injector = failureModel?.createInjector(context, clock, service) - val client = service.newClient() - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - injector?.start() - - var offset = Long.MIN_VALUE - - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) - - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) - - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - - // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) - - // Delete the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - - yield() - } finally { - injector?.close() - reader.close() - client.close() - } - } - - override fun close() { - service.close() - - for (host in hosts) { - host.close() - } - - hosts.clear() - } - - /** - * Construct a [ComputeService] instance. - */ - private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val service = ComputeService(context, clock, meterProvider, scheduler) - return service to meterProvider - } - - /** - * Construct a [SimHost] instance for the specified [MachineDef]. - */ - private fun createHost( - def: MachineDef, - hypervisorProvider: SimHypervisorProvider, - interferenceModel: VmInterferenceModel? = null - ): Pair<SimHost, SdkMeterProvider> { - val resource = Resource.builder() - .put(HOST_ID, def.uid.toString()) - .put(HOST_NAME, def.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, def.model.cpus.size) - .put(HOST_MEM_CAPACITY, def.model.memory.sumOf { it.size }) - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val host = SimHost( - def.uid, - def.name, - def.model, - def.meta, - context, - interpreter, - meterProvider, - hypervisorProvider, - powerDriver = SimplePowerDriver(def.powerModel), - interferenceDomain = interferenceModel?.newDomain() - ) - - return host to meterProvider - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt deleted file mode 100644 index 83393896..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModel.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.util - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.failure.HostFaultInjector -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts. - */ -interface FailureModel { - /** - * Construct a [HostFaultInjector] for the specified [service]. - */ - fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt deleted file mode 100644 index 89b4a31c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/FailureModels.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("FailureModels") -package org.opendc.experiments.capelin - -import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.commons.math3.random.Well19937c -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.failure.HostFaultInjector -import org.opendc.compute.simulator.failure.StartStopHostFault -import org.opendc.compute.simulator.failure.StochasticVictimSelector -import org.opendc.experiments.capelin.util.FailureModel -import java.time.Clock -import java.time.Duration -import kotlin.coroutines.CoroutineContext -import kotlin.math.ln -import kotlin.random.Random - -/** - * Obtain a [FailureModel] based on the GRID'5000 failure trace. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -fun grid5000(failureInterval: Duration, seed: Int): FailureModel { - return object : FailureModel { - override fun createInjector( - context: CoroutineContext, - clock: Clock, - service: ComputeService - ): HostFaultInjector { - val rng = Well19937c(seed) - val hosts = service.hosts.map { it as SimHost }.toSet() - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) - } - - override fun toString(): String = "Grid5000FailureModel" - } -} - -/** - * Obtain the [HostFaultInjector] to use for the experiments. - * - * This fault injector uses parameters from the GRID'5000 failure trace as described in - * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. - */ -fun createFaultInjector( - context: CoroutineContext, - clock: Clock, - hosts: Set<SimHost>, - seed: Int, - failureInterval: Double -): HostFaultInjector { - val rng = Well19937c(seed) - - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return HostFaultInjector( - context, - clock, - hosts, - iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), - fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) - ) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt index b55bd577..67de2777 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/util/VmPlacementReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.trace +package org.opendc.experiments.capelin.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml index d1c01b8e..d46b50c3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml +++ b/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml @@ -36,7 +36,7 @@ <Logger name="org.opendc.experiments.capelin" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> - <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false"> + <Logger name="org.opendc.experiments.vm.trace" level="debug" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.apache.hadoop" level="warn" additivity="false"> diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 727530e3..b0f86346 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,14 +31,15 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.trace.RawParquetTraceReader +import org.opendc.compute.workload.trace.TraceReader +import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader -import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader -import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.experiments.capelin.trace.TraceReader -import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -85,7 +86,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -134,7 +135,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -184,7 +185,7 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, @@ -229,7 +230,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val simulator = ComputeServiceSimulator( + val simulator = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt deleted file mode 100644 index fbc39b87..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll - -/** - * Test suite for the [PerformanceInterferenceReader] class. - */ -class PerformanceInterferenceReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) - } -} |
