summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-01 22:30:39 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:30:16 +0200
commit8bae0f3053a53aac9d483ae97d99f2e7e80b42ef (patch)
tree293350c299b9d38ea05abf3a854715a26e69d0f4 /opendc-experiments
parentb2308e1077dc60ec6a4dc646613a4be5b59695a6 (diff)
refactor(capelin): Migrate trace reader to new trace API
This change updates the trace reading classes in the Capelin experiment to use the new trace API in order to re-use many of the trace reading parts.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt62
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt195
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt103
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt49
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt47
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt140
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt230
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt45
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt47
12 files changed, 927 insertions, 206 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index 24ff0ba1..fa4e9ed8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -22,10 +22,10 @@
package org.opendc.experiments.capelin.trace
-import org.apache.avro.generic.GenericData
+import org.opendc.experiments.capelin.trace.bp.BPTraceFormat
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.*
import java.io.File
import java.util.UUID
@@ -36,26 +36,29 @@ import java.util.UUID
*/
class RawParquetTraceReader(private val path: File) {
/**
+ * The [Trace] that represents this trace.
+ */
+ private val trace = BPTraceFormat().open(path.toURI().toURL())
+
+ /**
* Read the fragments into memory.
*/
- private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
+ private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
return try {
- while (true) {
- val record = reader.read() ?: break
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+ val time = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val duration = reader.get(RESOURCE_STATE_DURATION)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
+ time.toEpochMilli(),
+ duration.toMillis(),
cpuUsage,
cores
)
@@ -72,25 +75,24 @@ class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
+ private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
return try {
- while (true) {
- val record = metaReader.read() ?: break
+ while (reader.nextRow()) {
- val id = record["id"].toString()
+ val id = reader.get(RESOURCE_ID)
if (!fragments.containsKey(id)) {
continue
}
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
+ val submissionTime = reader.get(RESOURCE_START_TIME)
+ val endTime = reader.get(RESOURCE_END_TIME)
+ val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY)
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
@@ -98,13 +100,13 @@ class RawParquetTraceReader(private val path: File) {
val workload = SimTraceWorkload(vmFragments)
entries.add(
TraceEntry(
- uid, id, submissionTime, workload,
+ uid, id, submissionTime.toEpochMilli(), workload,
mapOf(
- "submit-time" to submissionTime,
- "end-time" to endTime,
+ "submit-time" to submissionTime.toEpochMilli(),
+ "end-time" to endTime.toEpochMilli(),
"total-load" to totalLoad,
"cores" to maxCores,
- "required-memory" to requiredMemory,
+ "required-memory" to requiredMemory.toLong(),
"workload" to workload
)
)
@@ -116,7 +118,7 @@ class RawParquetTraceReader(private val path: File) {
e.printStackTrace()
throw e
} finally {
- metaReader.close()
+ reader.close()
}
}
@@ -126,8 +128,8 @@ class RawParquetTraceReader(private val path: File) {
private val entries: List<TraceEntry<SimWorkload>>
init {
- val fragments = parseFragments(path)
- entries = parseMeta(path, fragments)
+ val fragments = parseFragments()
+ entries = parseMeta(fragments)
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index 0ded32f3..a021de8d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,8 +36,10 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
import org.opendc.trace.*
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -156,189 +158,19 @@ sealed class TraceConversion(name: String) : OptionGroup(name) {
): MutableList<Fragment>
}
-class SolvinityConversion : TraceConversion("Solvinity") {
- private val clusters by option()
- .split(",")
-
- private val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
- .file(canBeDir = false)
- .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } }
- .required()
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val clusters = clusters?.toSet() ?: emptySet()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach file@{ vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val values = line.split("\t")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@file
- }
- }
- }
- }
-
- println("Start of trace at $minTimestamp")
-
- val allFragments = mutableListOf<Fragment>()
-
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split("\t")
-
- vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val timestamp =
- (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
- }
-
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
-
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
- }
-
- return allFragments
- }
-}
-
/**
- * Conversion of the Bitbrains public trace.
+ * A [TraceConversion] that uses the Trace API to perform the conversion.
*/
-class BitbrainsConversion : TraceConversion("Bitbrains") {
+abstract class AbstractConversion(name: String) : TraceConversion(name) {
+ abstract val format: TraceFormat
+
override fun read(
traceDirectory: File,
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
val fragments = mutableListOf<Fragment>()
- val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL())
+ val trace = format.open(traceDirectory.toURI().toURL())
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
var lastId: String? = null
@@ -364,7 +196,7 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
val timestampMs = timestamp.toEpochMilli()
- val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val cores = reader.getInt(RESOURCE_STATE_NCPUS)
val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
@@ -410,6 +242,17 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
}
}
+class SolvinityConversion : AbstractConversion("Solvinity") {
+ override val format: TraceFormat = SvTraceFormat()
+}
+
+/**
+ * Conversion of the Bitbrains public trace.
+ */
+class BitbrainsConversion : AbstractConversion("Bitbrains") {
+ override val format: TraceFormat = BitbrainsTraceFormat()
+}
+
/**
* Conversion of the Azure public VM trace.
*/
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
new file mode 100644
index 00000000..35bfd5ef
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource state [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCE_STATES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ return BPResourceStateTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
new file mode 100644
index 00000000..0e7ee555
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_DURATION -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_STATE_ID -> record["id"].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
+ RESOURCE_STATE_NCPUS -> record["cores"]
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceStateTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
new file mode 100644
index 00000000..74d1e574
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.nio.file.Path
+
+/**
+ * The resource [Table] in the Bitbrains Parquet format.
+ */
+internal class BPResourceTable(private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ return BPResourceTableReader(reader)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
new file mode 100644
index 00000000..0a105783
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.generic.GenericRecord
+import org.opendc.trace.*
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Instant
+
+/**
+ * A [TableReader] implementation for the Bitbrains Parquet format.
+ */
+internal class BPResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: GenericRecord? = null
+
+ override fun nextRow(): Boolean {
+ record = reader.read()
+ return record != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_END_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ @Suppress("UNCHECKED_CAST")
+ val res: Any = when (column) {
+ RESOURCE_ID -> record["id"].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
+ RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> record["maxCores"]
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_NCPUS -> record["maxCores"] as Int
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "BPResourceTableReader"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
new file mode 100644
index 00000000..486587b1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTrace.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.TABLE_RESOURCES
+import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * A [Trace] in the Bitbrains Parquet format.
+ */
+public class BPTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean =
+ name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> BPResourceTable(path)
+ TABLE_RESOURCE_STATES -> BPResourceStateTable(path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "BPTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
new file mode 100644
index 00000000..49d5b4c5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BPTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains-parquet"
+
+ /**
+ * Open a Bitbrains Parquet trace.
+ */
+ override fun open(url: URL): BPTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return BPTrace(path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
new file mode 100644
index 00000000..71c9d52e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] in the Bitbrains format.
+ */
+internal class SvResourceStateTable(path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override fun isSupported(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, path) = it.next()
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ val reader = path.bufferedReader()
+ return SvResourceStateTableReader(partition, reader)
+ }
+
+ override fun toString(): String = "SvResourceStateTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
new file mode 100644
index 00000000..adcdb2ea
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.io.BufferedReader
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader {
+ /**
+ * The current parser state.
+ */
+ private val state = RowState()
+
+ override fun nextRow(): Boolean {
+ state.reset()
+
+ var line: String
+ var num = 0
+
+ while (true) {
+ line = reader.readLine() ?: return false
+ num++
+
+ if (line[0] == '#' || line.isBlank()) {
+ // Ignore empty lines or comments
+ continue
+ }
+
+ break
+ }
+
+ line = line.trim()
+
+ val length = line.length
+ var col = 0
+ var start = 0
+ var end = 0
+
+ while (end < length) {
+ // Trim all whitespace before the field
+ start = end
+ while (start < length && line[start].isWhitespace()) {
+ start++
+ }
+
+ end = line.indexOf(' ', start)
+
+ if (end < 0) {
+ break
+ }
+
+ val field = line.subSequence(start, end) as String
+ when (col++) {
+ COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> state.cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> state.cpuDemand = field.toDouble()
+ COL_DISK_READ -> state.diskRead = field.toDouble()
+ COL_DISK_WRITE -> state.diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> state.cluster = field.trim()
+ COL_NCPUS -> state.cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble()
+ COL_ID -> state.id = field.trim()
+ COL_MEM_CAPACITY -> state.memCapacity = field.toDouble()
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_CLUSTER_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_CAPACITY -> true
+ RESOURCE_STATE_CPU_USAGE -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ RESOURCE_STATE_CPU_DEMAND -> true
+ RESOURCE_STATE_CPU_READY_PCT -> true
+ RESOURCE_STATE_MEM_CAPACITY -> true
+ RESOURCE_STATE_DISK_READ -> true
+ RESOURCE_STATE_DISK_WRITE -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> state.id
+ RESOURCE_STATE_CLUSTER_ID -> state.cluster
+ RESOURCE_STATE_TIMESTAMP -> state.timestamp
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_POWERED_ON -> state.poweredOn
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_STATE_NCPUS -> state.cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
+ RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
+ RESOURCE_STATE_DISK_READ -> state.diskRead
+ RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ /**
+ * The current row state.
+ */
+ private class RowState {
+ @JvmField
+ var id: String? = null
+ @JvmField
+ var cluster: String? = null
+ @JvmField
+ var timestamp: Instant? = null
+ @JvmField
+ var cpuCores = -1
+ @JvmField
+ var cpuCapacity = Double.NaN
+ @JvmField
+ var cpuUsage = Double.NaN
+ @JvmField
+ var cpuDemand = Double.NaN
+ @JvmField
+ var cpuReadyPct = Double.NaN
+ @JvmField
+ var memCapacity = Double.NaN
+ @JvmField
+ var diskRead = Double.NaN
+ @JvmField
+ var diskWrite = Double.NaN
+ @JvmField
+ var poweredOn: Boolean = false
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
+ }
+ }
+
+ /**
+ * Default column indices for the extended Bitbrains format.
+ */
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_USAGE = 1
+ private val COL_CPU_DEMAND = 2
+ private val COL_DISK_READ = 4
+ private val COL_DISK_WRITE = 6
+ private val COL_CLUSTER_ID = 10
+ private val COL_NCPUS = 12
+ private val COL_CPU_READY_PCT = 13
+ private val COL_POWERED_ON = 14
+ private val COL_CPU_CAPACITY = 18
+ private val COL_ID = 19
+ private val COL_MEM_CAPACITY = 20
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
new file mode 100644
index 00000000..dbd63de5
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTrace.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the extended Bitbrains format.
+ */
+public class SvTrace internal constructor(private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+
+ override fun getTable(name: String): Table? {
+ if (!containsTable(name)) {
+ return null
+ }
+
+ return SvResourceStateTable(path)
+ }
+
+ override fun toString(): String = "SvTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
new file mode 100644
index 00000000..0cce8559
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.sv
+
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the extended Bitbrains trace format.
+ */
+public class SvTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "sv"
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): SvTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return SvTrace(path)
+ }
+}