From e8cdfbcec3f75b3f303ce52bac5f5595a94555e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:14:46 +0200 Subject: refactor(trace): Extract Parquet helpers into separate module This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers. --- opendc-experiments/opendc-experiments-capelin/build.gradle.kts | 1 + .../opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt | 2 +- .../org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt | 2 +- .../org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt | 2 +- .../main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt | 2 +- 5 files changed, 5 insertions(+), 4 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 53643aba..e597c5ad 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -32,6 +32,7 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcFormat) + implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt index d8f7ff75..897a6692 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -28,7 +28,7 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.experiments.capelin.telemetry.Event -import org.opendc.format.util.LocalOutputFile +import org.opendc.trace.util.parquet.LocalOutputFile import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 16ad6816..2630784b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -25,9 +25,9 @@ package org.opendc.experiments.capelin.trace import org.apache.avro.generic.GenericData import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.util.parquet.LocalParquetReader import java.io.File import java.util.UUID diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 35f4c5b8..9b5d0f47 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -32,9 +32,9 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalInputFile import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.trace.util.parquet.LocalInputFile import java.io.File import java.io.Serializable import java.util.SortedSet diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index d7daa35b..e64f997f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -37,8 +37,8 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.format.trace.bitbrains.BitbrainsTraceReader -import org.opendc.format.util.LocalOutputFile import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File import java.io.FileReader -- cgit v1.2.3 From 9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:44:47 +0200 Subject: refactor(format): Remove environment reader from format library This change removes the environment reader from the format library since they are highly specific for the particular experiment. In the future, we hope to have a single format to setup the entire datacenter (perhaps similar to the format used by the web runner). --- .../experiments/capelin/ExperimentHelpers.kt | 2 +- .../capelin/env/ClusterEnvironmentReader.kt | 2 -- .../experiments/capelin/env/EnvironmentReader.kt | 35 ++++++++++++++++++++ .../opendc/experiments/capelin/env/MachineDef.kt | 38 ++++++++++++++++++++++ .../capelin/trace/StreamingParquetTraceReader.kt | 2 +- .../experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../opendc-experiments-tf20/build.gradle.kts | 5 ++- .../experiments/tf20/TensorFlowExperiment.kt | 3 +- .../experiments/tf20/util/MLEnvironmentReader.kt | 17 +++++----- .../org/opendc/experiments/tf20/util/MachineDef.kt | 38 ++++++++++++++++++++++ 10 files changed, 128 insertions(+), 16 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt create mode 100644 opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 7f428b2a..20dd603f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -45,9 +45,9 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index d73d14f5..babd8ada 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.capelin.env -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt new file mode 100644 index 00000000..a968b043 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/EnvironmentReader.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.env + +import java.io.Closeable + +/** + * An interface for reading descriptions of topology environments into memory. + */ +public interface EnvironmentReader : Closeable { + /** + * Read the environment into a list. + */ + public fun read(): List +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt new file mode 100644 index 00000000..b0c0318f --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.env + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map, + val model: MachineModel, + val powerModel: PowerModel +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 9b5d0f47..9bcbdc75 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -187,7 +187,7 @@ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = e assert(uid !in takenIds) takenIds += uid - logger.info("Processing VM $id") + logger.info { "Processing VM $id" } val internalBuffer = mutableListOf() val externalBuffer = mutableListOf() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index e4d3fed3..9d6329d1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -36,12 +36,12 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader +import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index b088045b..882c4894 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -34,8 +34,11 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcFormat) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.jackson.module.kotlin) { + exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") + } + implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30") } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 9a48aced..2153a862 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { .build() val meter = meterProvider.get("opendc-tf20") - val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first() + val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) + val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], def.model.memory[0], LinearPowerModel(250.0, 60.0) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index 3e61f508..3cdf28fd 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -25,8 +25,6 @@ package org.opendc.experiments.tf20.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -36,13 +34,16 @@ import java.io.InputStream import java.util.* /** - * An [EnvironmentReader] for the TensorFlow experiments. + * An environment reader for the TensorFlow experiments. */ -public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { +public class MLEnvironmentReader { + /** + * The [ObjectMapper] to convert the format. + */ + private val mapper = jacksonObjectMapper() - private val setup: Setup = mapper.readValue(input) - - override fun read(): List { + public fun readEnvironment(input: InputStream): List { + val setup: Setup = mapper.readValue(input) var counter = 0 return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack } } } - - override fun close() {} } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt new file mode 100644 index 00000000..271f5923 --- /dev/null +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.tf20.util + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map, + val model: MachineModel, + val powerModel: PowerModel +) -- cgit v1.2.3 From 214480d154771f0b783829b6e5ec82b837304ad2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 16:18:56 +0200 Subject: refactor(trace): Move Bitbrains format into separate module This change moves Bitbrains trace support into a separate module and adds support for the new trace api. --- .../opendc-experiments-capelin/build.gradle.kts | 1 + .../experiments/capelin/trace/TraceConverter.kt | 84 ++++++++++++++++------ 2 files changed, 63 insertions(+), 22 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index e597c5ad..97ca97ec 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcFormat) implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index e64f997f..0ded32f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.bitbrains.BitbrainsTraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.trace.* +import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaWriter: ParquetWriter ): MutableList { val fragments = mutableListOf() - BitbrainsTraceReader(traceDirectory).use { reader -> - reader.forEach { entry -> - val trace = (entry.workload as SimTraceWorkload).trace - var maxTime = Long.MIN_VALUE - trace.forEach { fragment -> - val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL()) + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var lastId: String? = null + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var minTime = Long.MAX_VALUE + var maxTime = Long.MIN_VALUE + var lastTimestamp = Long.MIN_VALUE + + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + + if (lastId != null && lastId != id) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", lastId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + lastId = id + + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) + val timestampMs = timestamp.toEpochMilli() + val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) + + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) + + if (lastTimestamp < 0) { + lastTimestamp = timestampMs - 5 * 60 * 1000L + minTime = min(minTime, lastTimestamp) + } + + if (fragments.isEmpty()) { + val duration = 5 * 60 * 1000L + val flops: Long = (cpuUsage * duration / 1000).toLong() + fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) + } else { + val last = fragments.last() + val duration = timestampMs - lastTimestamp + val flops: Long = (cpuUsage * duration / 1000).toLong() + + // Perform run-length encoding + if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { + fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) + } else { fragments.add( Fragment( - entry.name, - fragment.timestamp, + id, + lastTimestamp, flops, - fragment.duration, - fragment.usage, - fragment.cores + duration, + cpuUsage, + cores ) ) - maxTime = max(maxTime, fragment.timestamp + fragment.duration) } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.name) - metaRecord.put("submissionTime", entry.start) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", entry.meta["cores"]) - metaRecord.put("requiredMemory", entry.meta["required-memory"]) - metaWriter.write(metaRecord) } + + val last = fragments.last() + maxTime = max(maxTime, last.tick + last.duration) + lastTimestamp = timestampMs } return fragments } -- cgit v1.2.3 From b2308e1077dc60ec6a4dc646613a4be5b59695a6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 1 Sep 2021 11:29:27 +0200 Subject: refactor(trace): Implement trace API for WTF reader This change updates the WTF trace reader to support the new streaming trace API. --- .../opendc-experiments-capelin/build.gradle.kts | 4 +- .../experiments/capelin/ExperimentHelpers.kt | 2 +- .../capelin/trace/ParquetTraceReader.kt | 2 - .../capelin/trace/RawParquetTraceReader.kt | 2 - .../capelin/trace/StreamingParquetTraceReader.kt | 2 - .../opendc/experiments/capelin/trace/TraceEntry.kt | 44 ++++++++++++++++++++++ .../experiments/capelin/trace/TraceReader.kt | 32 ++++++++++++++++ .../experiments/capelin/trace/WorkloadSampler.kt | 1 - .../experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../opendc-experiments-energy21/build.gradle.kts | 1 - 10 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 97ca97ec..65cebe1b 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) @@ -45,6 +44,9 @@ dependencies { implementation(libs.config) implementation(libs.progressbar) implementation(libs.clikt) + implementation(libs.jackson.module.kotlin) { + exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") + } implementation(libs.parquet) testImplementation(libs.log4j.slf4j) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 20dd603f..46e11056 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -48,7 +48,7 @@ import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.format.trace.TraceReader +import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.power.SimplePowerDriver diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 0f49ecd2..0bf4ada6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -24,8 +24,6 @@ package org.opendc.experiments.capelin.trace import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 2630784b..24ff0ba1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -23,8 +23,6 @@ package org.opendc.experiments.capelin.trace import org.apache.avro.generic.GenericData -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.trace.util.parquet.LocalParquetReader diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 9bcbdc75..61e4cab5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -30,8 +30,6 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.trace.util.parquet.LocalInputFile diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt new file mode 100644 index 00000000..303a6a8c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceEntry.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import java.util.UUID + +/** + * An entry in a workload trace. + * + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. + */ +public data class TraceEntry( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map +) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt new file mode 100644 index 00000000..08304edc --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceReader.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +/** + * An interface for reading workloads into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 6de3f265..cb32ce88 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -26,7 +26,6 @@ import mu.KotlinLogging import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry import org.opendc.simulator.compute.workload.SimWorkload import java.util.* import kotlin.random.Random diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 9d6329d1..2934bbe6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -42,7 +42,7 @@ import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader -import org.opendc.format.trace.TraceReader +import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index bc05f09b..7d34d098 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) -- cgit v1.2.3 From 8bae0f3053a53aac9d483ae97d99f2e7e80b42ef Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 1 Sep 2021 22:30:39 +0200 Subject: refactor(capelin): Migrate trace reader to new trace API This change updates the trace reading classes in the Capelin experiment to use the new trace API in order to re-use many of the trace reading parts. --- .../capelin/trace/RawParquetTraceReader.kt | 62 +++--- .../experiments/capelin/trace/TraceConverter.kt | 195 ++--------------- .../capelin/trace/bp/BPResourceStateTable.kt | 56 +++++ .../capelin/trace/bp/BPResourceStateTableReader.kt | 103 +++++++++ .../capelin/trace/bp/BPResourceTable.kt | 56 +++++ .../capelin/trace/bp/BPResourceTableReader.kt | 103 +++++++++ .../opendc/experiments/capelin/trace/bp/BPTrace.kt | 49 +++++ .../experiments/capelin/trace/bp/BPTraceFormat.kt | 47 +++++ .../capelin/trace/sv/SvResourceStateTable.kt | 140 +++++++++++++ .../capelin/trace/sv/SvResourceStateTableReader.kt | 230 +++++++++++++++++++++ .../opendc/experiments/capelin/trace/sv/SvTrace.kt | 45 ++++ .../experiments/capelin/trace/sv/SvTraceFormat.kt | 47 +++++ 12 files changed, 927 insertions(+), 206 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 24ff0ba1..fa4e9ed8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -22,10 +22,10 @@ package org.opendc.experiments.capelin.trace -import org.apache.avro.generic.GenericData +import org.opendc.experiments.capelin.trace.bp.BPTraceFormat import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.* import java.io.File import java.util.UUID @@ -35,27 +35,30 @@ import java.util.UUID * @param path The directory of the traces. */ class RawParquetTraceReader(private val path: File) { + /** + * The [Trace] that represents this trace. + */ + private val trace = BPTraceFormat().open(path.toURI().toURL()) + /** * Read the fragments into memory. */ - private fun parseFragments(path: File): Map> { - val reader = LocalParquetReader(File(path, "trace.parquet")) + private fun parseFragments(): Map> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val fragments = mutableMapOf>() return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + val time = reader.get(RESOURCE_STATE_TIMESTAMP) + val duration = reader.get(RESOURCE_STATE_DURATION) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( - time, - duration, + time.toEpochMilli(), + duration.toMillis(), cpuUsage, cores ) @@ -72,25 +75,24 @@ class RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map>): List> { - val metaReader = LocalParquetReader(File(path, "meta.parquet")) + private fun parseMeta(fragments: Map>): List> { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() var counter = 0 val entries = mutableListOf>() return try { - while (true) { - val record = metaReader.read() ?: break + while (reader.nextRow()) { - val id = record["id"].toString() + val id = reader.get(RESOURCE_ID) if (!fragments.containsKey(id)) { continue } - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long + val submissionTime = reader.get(RESOURCE_START_TIME) + val endTime = reader.get(RESOURCE_END_TIME) + val maxCores = reader.getInt(RESOURCE_NCPUS) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() @@ -98,13 +100,13 @@ class RawParquetTraceReader(private val path: File) { val workload = SimTraceWorkload(vmFragments) entries.add( TraceEntry( - uid, id, submissionTime, workload, + uid, id, submissionTime.toEpochMilli(), workload, mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, + "submit-time" to submissionTime.toEpochMilli(), + "end-time" to endTime.toEpochMilli(), "total-load" to totalLoad, "cores" to maxCores, - "required-memory" to requiredMemory, + "required-memory" to requiredMemory.toLong(), "workload" to workload ) ) @@ -116,7 +118,7 @@ class RawParquetTraceReader(private val path: File) { e.printStackTrace() throw e } finally { - metaReader.close() + reader.close() } } @@ -126,8 +128,8 @@ class RawParquetTraceReader(private val path: File) { private val entries: List> init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) + val fragments = parseFragments() + entries = parseMeta(fragments) } /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index 0ded32f3..a021de8d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -36,8 +36,10 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.experiments.capelin.trace.sv.SvTraceFormat import org.opendc.trace.* import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -156,189 +158,19 @@ sealed class TraceConversion(name: String) : OptionGroup(name) { ): MutableList } -class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } - .required() - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - } - - return allFragments - } -} - /** - * Conversion of the Bitbrains public trace. + * A [TraceConversion] that uses the Trace API to perform the conversion. */ -class BitbrainsConversion : TraceConversion("Bitbrains") { +abstract class AbstractConversion(name: String) : TraceConversion(name) { + abstract val format: TraceFormat + override fun read( traceDirectory: File, metaSchema: Schema, metaWriter: ParquetWriter ): MutableList { val fragments = mutableListOf() - val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL()) + val trace = format.open(traceDirectory.toURI().toURL()) val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var lastId: String? = null @@ -364,7 +196,7 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) val timestampMs = timestamp.toEpochMilli() - val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val cores = reader.getInt(RESOURCE_STATE_NCPUS) val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) @@ -410,6 +242,17 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { } } +class SolvinityConversion : AbstractConversion("Solvinity") { + override val format: TraceFormat = SvTraceFormat() +} + +/** + * Conversion of the Bitbrains public trace. + */ +class BitbrainsConversion : AbstractConversion("Bitbrains") { + override val format: TraceFormat = BitbrainsTraceFormat() +} + /** * Conversion of the Azure public VM trace. */ diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt new file mode 100644 index 00000000..35bfd5ef --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource state [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceStateTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCE_STATES + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_DURATION -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_USAGE -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = LocalParquetReader(path.resolve("trace.parquet")) + return BPResourceStateTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt new file mode 100644 index 00000000..0e7ee555 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_DURATION -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_USAGE -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_STATE_ID -> record["id"].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) + RESOURCE_STATE_NCPUS -> record["cores"] + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_STATE_NCPUS -> record["cores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (column) { + RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceStateTableReader" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt new file mode 100644 index 00000000..74d1e574 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.nio.file.Path + +/** + * The resource [Table] in the Bitbrains Parquet format. + */ +internal class BPResourceTable(private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_END_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = LocalParquetReader(path.resolve("meta.parquet")) + return BPResourceTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt new file mode 100644 index 00000000..0a105783 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Instant + +/** + * A [TableReader] implementation for the Bitbrains Parquet format. + */ +internal class BPResourceTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: GenericRecord? = null + + override fun nextRow(): Boolean { + record = reader.read() + return record != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_END_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val record = checkNotNull(record) { "Reader in invalid state" } + + @Suppress("UNCHECKED_CAST") + val res: Any = when (column) { + RESOURCE_ID -> record["id"].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) + RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> record["maxCores"] + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_NCPUS -> record["maxCores"] as Int + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (column) { + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "BPResourceTableReader" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt new file mode 100644 index 00000000..486587b1 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.opendc.trace.TABLE_RESOURCES +import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * A [Trace] in the Bitbrains Parquet format. + */ +public class BPTrace internal constructor(private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = + name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> BPResourceTable(path) + TABLE_RESOURCE_STATES -> BPResourceStateTable(path) + else -> null + } + } + + override fun toString(): String = "BPTrace[$path]" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt new file mode 100644 index 00000000..49d5b4c5 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the GWF trace format. + */ +public class BPTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains-parquet" + + /** + * Open a Bitbrains Parquet trace. + */ + override fun open(url: URL): BPTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return BPTrace(path) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt new file mode 100644 index 00000000..71c9d52e --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] in the Bitbrains format. + */ +internal class SvResourceStateTable(path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_CPU_DEMAND -> true + RESOURCE_STATE_CPU_READY_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + else -> false + } + } + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun get(column: TableColumn): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (partition, path) = it.next() + val reader = path.bufferedReader() + return SvResourceStateTableReader(partition, reader) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + val reader = path.bufferedReader() + return SvResourceStateTableReader(partition, reader) + } + + override fun toString(): String = "SvResourceStateTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt new file mode 100644 index 00000000..adcdb2ea --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.* +import java.io.BufferedReader +import java.time.Instant + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + override fun nextRow(): Boolean { + state.reset() + + var line: String + var num = 0 + + while (true) { + line = reader.readLine() ?: return false + num++ + + if (line[0] == '#' || line.isBlank()) { + // Ignore empty lines or comments + continue + } + + break + } + + line = line.trim() + + val length = line.length + var col = 0 + var start = 0 + var end = 0 + + while (end < length) { + // Trim all whitespace before the field + start = end + while (start < length && line[start].isWhitespace()) { + start++ + } + + end = line.indexOf(' ', start) + + if (end < 0) { + break + } + + val field = line.subSequence(start, end) as String + when (col++) { + COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> state.cpuUsage = field.toDouble() + COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() + COL_DISK_READ -> state.diskRead = field.toDouble() + COL_DISK_WRITE -> state.diskWrite = field.toDouble() + COL_CLUSTER_ID -> state.cluster = field.trim() + COL_NCPUS -> state.cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() + COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() + COL_ID -> state.id = field.trim() + COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_CLUSTER_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_CPU_DEMAND -> true + RESOURCE_STATE_CPU_READY_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> state.id + RESOURCE_STATE_CLUSTER_ID -> state.cluster + RESOURCE_STATE_TIMESTAMP -> state.timestamp + RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + return when (column) { + RESOURCE_STATE_POWERED_ON -> state.poweredOn + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + RESOURCE_STATE_NCPUS -> state.cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + reader.close() + } + + /** + * The current row state. + */ + private class RowState { + @JvmField + var id: String? = null + @JvmField + var cluster: String? = null + @JvmField + var timestamp: Instant? = null + @JvmField + var cpuCores = -1 + @JvmField + var cpuCapacity = Double.NaN + @JvmField + var cpuUsage = Double.NaN + @JvmField + var cpuDemand = Double.NaN + @JvmField + var cpuReadyPct = Double.NaN + @JvmField + var memCapacity = Double.NaN + @JvmField + var diskRead = Double.NaN + @JvmField + var diskWrite = Double.NaN + @JvmField + var poweredOn: Boolean = false + + /** + * Reset the state. + */ + fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false + } + } + + /** + * Default column indices for the extended Bitbrains format. + */ + private val COL_TIMESTAMP = 0 + private val COL_CPU_USAGE = 1 + private val COL_CPU_DEMAND = 2 + private val COL_DISK_READ = 4 + private val COL_DISK_WRITE = 6 + private val COL_CLUSTER_ID = 10 + private val COL_NCPUS = 12 + private val COL_CPU_READY_PCT = 13 + private val COL_POWERED_ON = 14 + private val COL_CPU_CAPACITY = 18 + private val COL_ID = 19 + private val COL_MEM_CAPACITY = 20 +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt new file mode 100644 index 00000000..dbd63de5 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the extended Bitbrains format. + */ +public class SvTrace internal constructor(private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return SvResourceStateTable(path) + } + + override fun toString(): String = "SvTrace[$path]" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt new file mode 100644 index 00000000..0cce8559 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.sv + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the extended Bitbrains trace format. + */ +public class SvTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "sv" + + /** + * Open the trace file. + */ + override fun open(url: URL): SvTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return SvTrace(path) + } +} -- cgit v1.2.3