diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-31 16:18:56 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-02 11:30:15 +0200 |
| commit | 214480d154771f0b783829b6e5ec82b837304ad2 (patch) | |
| tree | 84d823132bdd0e351ec5a41c210be6551a98273d | |
| parent | 9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 (diff) | |
refactor(trace): Move Bitbrains format into separate module
This change moves Bitbrains trace support into a separate module and
adds support for the new trace api.
13 files changed, 635 insertions, 269 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index e597c5ad..97ca97ec 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcFormat) implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index e64f997f..0ded32f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.bitbrains.BitbrainsTraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.trace.* +import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { val fragments = mutableListOf<Fragment>() - BitbrainsTraceReader(traceDirectory).use { reader -> - reader.forEach { entry -> - val trace = (entry.workload as SimTraceWorkload).trace - var maxTime = Long.MIN_VALUE - trace.forEach { fragment -> - val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL()) + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var lastId: String? = null + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var minTime = Long.MAX_VALUE + var maxTime = Long.MIN_VALUE + var lastTimestamp = Long.MIN_VALUE + + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + + if (lastId != null && lastId != id) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", lastId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + lastId = id + + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) + val timestampMs = timestamp.toEpochMilli() + val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) + + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) + + if (lastTimestamp < 0) { + lastTimestamp = timestampMs - 5 * 60 * 1000L + minTime = min(minTime, lastTimestamp) + } + + if (fragments.isEmpty()) { + val duration = 5 * 60 * 1000L + val flops: Long = (cpuUsage * duration / 1000).toLong() + fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) + } else { + val last = fragments.last() + val duration = timestampMs - lastTimestamp + val flops: Long = (cpuUsage * duration / 1000).toLong() + + // Perform run-length encoding + if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { + fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) + } else { fragments.add( Fragment( - entry.name, - fragment.timestamp, + id, + lastTimestamp, flops, - fragment.duration, - fragment.usage, - fragment.cores + duration, + cpuUsage, + cores ) ) - maxTime = max(maxTime, fragment.timestamp + fragment.duration) } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.name) - metaRecord.put("submissionTime", entry.start) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", entry.meta["cores"]) - metaRecord.put("requiredMemory", entry.meta["required-memory"]) - metaWriter.write(metaRecord) } + + val last = fragments.last() + maxTime = max(maxTime, last.tick + last.duration) + lastTimestamp = timestampMs } return fragments } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt deleted file mode 100644 index ff6cdd02..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.MappingIterator -import com.fasterxml.jackson.dataformat.csv.CsvMapper -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import java.io.InputStream - -/** - * A trace reader that enables the user to read Bitbrains specific trace data. - */ -public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - - /** - * The mapping iterator to use. - */ - private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema) - .readValues(input) - - override fun hasNext(): Boolean { - return iterator.hasNext() - } - - override fun next(): Entry { - return iterator.next() - } - - override fun close() { - iterator.close() - } - - /** - * A single entry in the trace. - */ - public data class Entry( - @JsonProperty("Timestamp [ms]") - val timestamp: Long, - @JsonProperty("CPU cores") - val cpuCores: Int, - @JsonProperty("CPU capacity provisioned [MHZ]") - val cpuCapacity: Double, - @JsonProperty("CPU usage [MHZ]") - val cpuUsage: Double, - @JsonProperty("CPU usage [%]") - val cpuUsagePct: Double, - @JsonProperty("Memory capacity provisioned [KB]") - val memCapacity: Double, - @JsonProperty("Memory usage [KB]") - val memUsage: Double, - @JsonProperty("Disk read throughput [KB/s]") - val diskRead: Double, - @JsonProperty("Disk write throughput [KB/s]") - val diskWrite: Double, - @JsonProperty("Network received throughput [KB/s]") - val netReceived: Double, - @JsonProperty("Network transmitted throughput [KB/s]") - val netTransmitted: Double - ) -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt deleted file mode 100644 index 9e4876df..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.FileInputStream -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - */ -public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .forEach { vmFile -> - val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() - var vmId = -1L - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var startTime = Long.MAX_VALUE - var lastTimestamp = Long.MIN_VALUE - - BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> - reader.forEach { entry -> - val timestamp = entry.timestamp * 1000L - val cpuUsage = entry.cpuUsage - vmId = vmFile.nameWithoutExtension.trim().toLong() - val cores = entry.cpuCores - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) - - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - startTime = min(startTime, lastTimestamp) - } - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) - } else { - val last = flopsHistory.last() - val duration = timestamp - lastTimestamp - // Perform run-length encoding - if (duration == 0L || last.usage == cpuUsage) { - flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) - } else { - flopsHistory.add( - SimTraceWorkload.Fragment( - lastTimestamp, - duration, - cpuUsage, - cores - ) - ) - } - } - - lastTimestamp = timestamp - } - } - - val uuid = UUID(0L, vmId) - - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[vmId] = TraceEntry( - uuid, - vmId.toString(), - startTime, - workload, - mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts index 48b4a2e3..d195cbbb 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -20,26 +20,17 @@ * SOFTWARE. */ -package org.opendc.format.trace.bitbrains +description = "Support for GWF traces in OpenDC" -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -/** - * Test suite for the [BitbrainsTraceReader] class. - */ -class BitbrainsTraceReaderTest { - @Test - fun testSmoke() { - val file = BitbrainsTraceReaderTest::class.java.getResourceAsStream("/bitbrains.csv")!! - BitbrainsRawTraceReader(file).use { reader -> - val entry = reader.next() +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} - assertAll( - { assertEquals(1376314846, entry.timestamp) }, - { assertEquals(19.066, entry.cpuUsage, 0.01) } - ) - } - } +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + implementation(libs.jackson.dataformat.csv) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt new file mode 100644 index 00000000..767ef919 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] in the Bitbrains format. + */ +internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_MEM_USAGE -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + RESOURCE_STATE_NET_RX -> true + RESOURCE_STATE_NET_TX -> true + else -> false + } + } + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun <T> get(column: TableColumn<T>): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn<Int>): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn<Long>): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn<Double>): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (partition, path) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile())) + } + + override fun toString(): String = "BitbrainsResourceStateTable" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt new file mode 100644 index 00000000..5687ac7f --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + // Reset the row state + state.reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) + "CPU cores" -> state.cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue + "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue + "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue + "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue + "Memory usage [KB]" -> state.memUsage = parser.doubleValue + "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue + "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue + "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue + "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_CAPACITY -> true + RESOURCE_STATE_CPU_USAGE -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + RESOURCE_STATE_MEM_CAPACITY -> true + RESOURCE_STATE_MEM_USAGE -> true + RESOURCE_STATE_DISK_READ -> true + RESOURCE_STATE_DISK_WRITE -> true + RESOURCE_STATE_NET_RX -> true + RESOURCE_STATE_NET_TX -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> partition + RESOURCE_STATE_TIMESTAMP -> state.timestamp + RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_MEM_USAGE -> state.memUsage + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_NET_RX -> state.netReceived + RESOURCE_STATE_NET_TX -> state.netTransmitted + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + RESOURCE_STATE_NCPUS -> state.cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + return when (column) { + RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity + RESOURCE_STATE_CPU_USAGE -> state.cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity + RESOURCE_STATE_MEM_USAGE -> state.memUsage + RESOURCE_STATE_DISK_READ -> state.diskRead + RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_NET_RX -> state.netReceived + RESOURCE_STATE_NET_TX -> state.netTransmitted + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * The current row state. + */ + private class RowState { + var timestamp: Instant? = null + var cpuCores = -1 + var cpuCapacity = Double.NaN + var cpuUsage = Double.NaN + var cpuUsagePct = Double.NaN + var memCapacity = Double.NaN + var memUsage = Double.NaN + var diskRead = Double.NaN + var diskWrite = Double.NaN + var netReceived = Double.NaN + var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt new file mode 100644 index 00000000..5a2d4243 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the Bitbrains format. + */ +public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return BitbrainsResourceStateTable(factory, path) + } + + override fun toString(): String = "BitbrainsTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt new file mode 100644 index 00000000..55b11fe3 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the GWF trace format. + */ +public class BitbrainsTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Open a Bitbrains trace. + */ + override fun open(url: URL): BitbrainsTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return BitbrainsTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..f18135d0 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.bitbrains.BitbrainsTraceFormat diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt new file mode 100644 index 00000000..550805d3 --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.RESOURCE_STATE_CPU_USAGE +import org.opendc.trace.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.TABLE_RESOURCE_STATES +import java.net.URL + +/** + * Test suite for the [BitbrainsTraceFormat] class. + */ +class BitbrainsTraceFormatTest { + @Test + fun testTraceExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + assertDoesNotThrow { + format.open(url) + } + } + + @Test + fun testTraceDoesNotExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + assertThrows<IllegalArgumentException> { + format.open(URL(url.toString() + "help")) + } + } + + @Test + fun testTables() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + } + + @Test + fun testTableExists() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testSmoke() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + ) + + reader.close() + } +} diff --git a/opendc-format/src/test/resources/bitbrains.csv b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv index f5e300e8..f5e300e8 100644 --- a/opendc-format/src/test/resources/bitbrains.csv +++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv diff --git a/settings.gradle.kts b/settings.gradle.kts index ec697d80..fd1d404a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -48,6 +48,7 @@ include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") +include(":opendc-trace:opendc-trace-bitbrains") include(":opendc-trace:opendc-trace-parquet") include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") |
