summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-31 16:18:56 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:30:15 +0200
commit214480d154771f0b783829b6e5ec82b837304ad2 (patch)
tree84d823132bdd0e351ec5a41c210be6551a98273d
parent9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 (diff)
refactor(trace): Move Bitbrains format into separate module
This change moves Bitbrains trace support into a separate module and adds support for the new trace api.
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt84
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt100
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt127
-rw-r--r--opendc-trace/opendc-trace-bitbrains/build.gradle.kts (renamed from opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt)31
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt139
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt218
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt56
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt100
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv (renamed from opendc-format/src/test/resources/bitbrains.csv)0
-rw-r--r--settings.gradle.kts1
13 files changed, 635 insertions, 269 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index e597c5ad..97ca97ec 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -33,6 +33,7 @@ dependencies {
api(projects.opendcHarness.opendcHarnessApi)
implementation(projects.opendcFormat)
implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index e64f997f..0ded32f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
val fragments = mutableListOf<Fragment>()
- BitbrainsTraceReader(traceDirectory).use { reader ->
- reader.forEach { entry ->
- val trace = (entry.workload as SimTraceWorkload).trace
- var maxTime = Long.MIN_VALUE
- trace.forEach { fragment ->
- val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL())
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ var lastId: String? = null
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var minTime = Long.MAX_VALUE
+ var maxTime = Long.MIN_VALUE
+ var lastTimestamp = Long.MIN_VALUE
+
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (lastId != null && lastId != id) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", lastId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ lastId = id
+
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val timestampMs = timestamp.toEpochMilli()
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
+
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
+
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestampMs - 5 * 60 * 1000L
+ minTime = min(minTime, lastTimestamp)
+ }
+
+ if (fragments.isEmpty()) {
+ val duration = 5 * 60 * 1000L
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+ fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
+ } else {
+ val last = fragments.last()
+ val duration = timestampMs - lastTimestamp
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+
+ // Perform run-length encoding
+ if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
+ fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
+ } else {
fragments.add(
Fragment(
- entry.name,
- fragment.timestamp,
+ id,
+ lastTimestamp,
flops,
- fragment.duration,
- fragment.usage,
- fragment.cores
+ duration,
+ cpuUsage,
+ cores
)
)
- maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.name)
- metaRecord.put("submissionTime", entry.start)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", entry.meta["cores"])
- metaRecord.put("requiredMemory", entry.meta["required-memory"])
- metaWriter.write(metaRecord)
}
+
+ val last = fragments.last()
+ maxTime = max(maxTime, last.tick + last.duration)
+ lastTimestamp = timestampMs
}
return fragments
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
deleted file mode 100644
index ff6cdd02..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.MappingIterator
-import com.fasterxml.jackson.dataformat.csv.CsvMapper
-import com.fasterxml.jackson.dataformat.csv.CsvSchema
-import java.io.InputStream
-
-/**
- * A trace reader that enables the user to read Bitbrains specific trace data.
- */
-public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable {
- /**
- * The [CsvSchema] that is used to parse the trace.
- */
- private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(';')
- .build()
-
- /**
- * The mapping iterator to use.
- */
- private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema)
- .readValues(input)
-
- override fun hasNext(): Boolean {
- return iterator.hasNext()
- }
-
- override fun next(): Entry {
- return iterator.next()
- }
-
- override fun close() {
- iterator.close()
- }
-
- /**
- * A single entry in the trace.
- */
- public data class Entry(
- @JsonProperty("Timestamp [ms]")
- val timestamp: Long,
- @JsonProperty("CPU cores")
- val cpuCores: Int,
- @JsonProperty("CPU capacity provisioned [MHZ]")
- val cpuCapacity: Double,
- @JsonProperty("CPU usage [MHZ]")
- val cpuUsage: Double,
- @JsonProperty("CPU usage [%]")
- val cpuUsagePct: Double,
- @JsonProperty("Memory capacity provisioned [KB]")
- val memCapacity: Double,
- @JsonProperty("Memory usage [KB]")
- val memUsage: Double,
- @JsonProperty("Disk read throughput [KB/s]")
- val diskRead: Double,
- @JsonProperty("Disk write throughput [KB/s]")
- val diskWrite: Double,
- @JsonProperty("Network received throughput [KB/s]")
- val netReceived: Double,
- @JsonProperty("Network transmitted throughput [KB/s]")
- val netTransmitted: Double
- )
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
deleted file mode 100644
index 9e4876df..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.bitbrains
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.File
-import java.io.FileInputStream
-import java.util.*
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the public VM workload trace format.
- *
- * @param traceDirectory The directory of the traces.
- */
-public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * Initialize the reader.
- */
- init {
- val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
- val traceInterval = 5 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" }
- .forEach { vmFile ->
- val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
- var vmId = -1L
- var maxCores = Int.MIN_VALUE
- var requiredMemory = Long.MIN_VALUE
- var startTime = Long.MAX_VALUE
- var lastTimestamp = Long.MIN_VALUE
-
- BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader ->
- reader.forEach { entry ->
- val timestamp = entry.timestamp * 1000L
- val cpuUsage = entry.cpuUsage
- vmId = vmFile.nameWithoutExtension.trim().toLong()
- val cores = entry.cpuCores
- maxCores = max(maxCores, cores)
- requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong())
-
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- startTime = min(startTime, lastTimestamp)
- }
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores))
- } else {
- val last = flopsHistory.last()
- val duration = timestamp - lastTimestamp
- // Perform run-length encoding
- if (duration == 0L || last.usage == cpuUsage) {
- flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration)
- } else {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- lastTimestamp,
- duration,
- cpuUsage,
- cores
- )
- )
- }
- }
-
- lastTimestamp = timestamp
- }
- }
-
- val uuid = UUID(0L, vmId)
-
- val workload = SimTraceWorkload(flopsHistory.asSequence())
- entries[vmId] = TraceEntry(
- uuid,
- vmId.toString(),
- startTime,
- workload,
- mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- }
-
- // Create the entry iterator
- iterator = entries.values.sortedBy { it.start }.iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
index 48b4a2e3..d195cbbb 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReaderTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
@@ -20,26 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.format.trace.bitbrains
+description = "Support for GWF traces in OpenDC"
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-/**
- * Test suite for the [BitbrainsTraceReader] class.
- */
-class BitbrainsTraceReaderTest {
- @Test
- fun testSmoke() {
- val file = BitbrainsTraceReaderTest::class.java.getResourceAsStream("/bitbrains.csv")!!
- BitbrainsRawTraceReader(file).use { reader ->
- val entry = reader.next()
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
- assertAll(
- { assertEquals(1376314846, entry.timestamp) },
- { assertEquals(19.066, entry.cpuUsage, 0.01) }
- )
- }
- }
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+ implementation(libs.jackson.dataformat.csv)
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
new file mode 100644
index 00000000..767ef919
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the Bitbrains format.
+ */
+internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_MEM_USAGE -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ RESOURCE_STATE_NET_RX -> true
+ RESOURCE_STATE_NET_TX -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, path) = it.next()
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
+ }
+
+ override fun toString(): String = "BitbrainsResourceStateTable"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
new file mode 100644
index 00000000..5687ac7f
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ // Reset the row state
+ state.reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
+ "CPU cores" -> state.cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
+ "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
+ "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
+ "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
+ "Memory usage [KB]" -> state.memUsage = parser.doubleValue
+ "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
+ "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
+ "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
+ "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_MEM_USAGE -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ RESOURCE_STATE_NET_RX -> true
+ RESOURCE_STATE_NET_TX -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> partition
+ RESOURCE_STATE_TIMESTAMP -> state.timestamp
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_MEM_USAGE -> state.memUsage
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_NET_RX -> state.netReceived
+ RESOURCE_STATE_NET_TX -> state.netTransmitted
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_MEM_USAGE -> state.memUsage
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_NET_RX -> state.netReceived
+ RESOURCE_STATE_NET_TX -> state.netTransmitted
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ var timestamp: Instant? = null
+ var cpuCores = -1
+ var cpuCapacity = Double.NaN
+ var cpuUsage = Double.NaN
+ var cpuUsagePct = Double.NaN
+ var memCapacity = Double.NaN
+ var memUsage = Double.NaN
+ var diskRead = Double.NaN
+ var diskWrite = Double.NaN
+ var netReceived = Double.NaN
+ var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(';')
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
new file mode 100644
index 00000000..5a2d4243
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the Bitbrains format.
+ */
+public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return BitbrainsResourceStateTable(factory, path)
+ }
+
+ override fun toString(): String = "BitbrainsTrace[$path]"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
new file mode 100644
index 00000000..55b11fe3
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BitbrainsTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Open a Bitbrains trace.
+ */
+ override fun open(url: URL): BitbrainsTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BitbrainsTrace(factory, path)
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..f18135d0
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.bitbrains.BitbrainsTraceFormat
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
new file mode 100644
index 00000000..550805d3
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.RESOURCE_STATE_CPU_USAGE
+import org.opendc.trace.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import java.net.URL
+
+/**
+ * Test suite for the [BitbrainsTraceFormat] class.
+ */
+class BitbrainsTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ assertDoesNotThrow {
+ format.open(url)
+ }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ assertThrows<IllegalArgumentException> {
+ format.open(URL(url.toString() + "help"))
+ }
+ }
+
+ @Test
+ fun testTables() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ @Test
+ fun testSmoke() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ )
+
+ reader.close()
+ }
+}
diff --git a/opendc-format/src/test/resources/bitbrains.csv b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv
index f5e300e8..f5e300e8 100644
--- a/opendc-format/src/test/resources/bitbrains.csv
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/resources/bitbrains.csv
diff --git a/settings.gradle.kts b/settings.gradle.kts
index ec697d80..fd1d404a 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -48,6 +48,7 @@ include(":opendc-telemetry:opendc-telemetry-api")
include(":opendc-telemetry:opendc-telemetry-sdk")
include(":opendc-trace:opendc-trace-api")
include(":opendc-trace:opendc-trace-gwf")
+include(":opendc-trace:opendc-trace-bitbrains")
include(":opendc-trace:opendc-trace-parquet")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")