summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src/main/kotlin
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-04-16 09:29:53 +0200
committerGitHub <noreply@github.com>2024-04-16 09:29:53 +0200
commitfff89d25bd3c7b874e68261d21695c473c30ed7d (patch)
treebe368dd745e8119dbdefd9cd0b012c7ff9080a7a /opendc-trace/opendc-trace-api/src/main/kotlin
parenta7b0afbb5b7059274962ade234a50240677008fd (diff)
Revamped the trace system. All TraceFormat files are now in the api m… (#216)
* Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces * applied spotless
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt219
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt246
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt147
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt292
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt135
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt365
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt175
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt159
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt286
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt104
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt225
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt192
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt166
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt209
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt168
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt197
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt190
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt159
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt127
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt149
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt114
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt112
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt121
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt236
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt100
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt314
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt95
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt187
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt102
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt148
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt188
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt29
35 files changed, 5767 insertions, 2 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt
new file mode 100644
index 00000000..bcf6ff52
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateTimestamp
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] for the Azure v1 VM resource state table.
+ */
+internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
+ "vm id" -> id = parser.text
+ "CPU avg cpu" -> cpuUsagePct = (parser.doubleValue / 100.0) // Convert from % to [0, 1]
+ }
+ }
+
+ return true
+ }
+
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colCpuUsagePct = 2
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateCpuUsagePct -> colCpuUsagePct
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colCpuUsagePct) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ checkActive()
+ return when (index) {
+ colCpuUsagePct -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ colID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ colTimestamp -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var timestamp: Instant? = null
+ private var cpuUsagePct = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cpuUsagePct = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm id", CsvSchema.ColumnType.STRING)
+ .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt
new file mode 100644
index 00000000..d86a0466
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt
@@ -0,0 +1,246 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] for the Azure v1 VM resources table.
+ */
+internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "vm id" -> id = parser.text
+ "timestamp vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
+ "timestamp vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
+ "vm virtual core count" -> cpuCores = parser.intValue
+ "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
+ }
+ }
+
+ return true
+ }
+
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colMemCapacity = 4
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceMemCapacity -> colMemCapacity
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colMemCapacity) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ checkActive()
+ return when (index) {
+ colCpuCount -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ checkActive()
+ return when (index) {
+ colCpuCount -> cpuCores.toLong()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ checkActive()
+ return when (index) {
+ colMemCapacity -> memCapacity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ colID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ colStartTime -> startTime
+ colStopTime -> stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var startTime: Instant? = null
+ private var stopTime: Instant? = null
+ private var cpuCores = -1
+ private var memCapacity = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ id = null
+ startTime = null
+ stopTime = null
+ cpuCores = -1
+ memCapacity = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("subscription id", CsvSchema.ColumnType.STRING)
+ .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
+ .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt
new file mode 100644
index 00000000..a75da9d9
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_RESOURCES
+import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import java.util.zip.GZIPInputStream
+import kotlin.io.path.inputStream
+import kotlin.io.path.name
+
+/**
+ * A format implementation for the Azure v1 format.
+ */
+public class AzureTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "azure"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStartTime, TableColumnType.Instant),
+ TableColumn(resourceStopTime, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ ),
+ )
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
+ AzureResourceTableReader(factory.createParser(stream))
+ }
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ /**
+ * Construct a [TableReader] for reading over all VM CPU readings.
+ */
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions =
+ Files.walk(path.resolve("vm_cpu_readings"), 1)
+ .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") }
+ .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, partPath) = it.next()
+ val stream = GZIPInputStream(partPath.inputStream())
+ return AzureResourceStateTableReader(factory.createParser(stream))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "AzureCompositeTableReader"
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt
new file mode 100644
index 00000000..8387d1ed
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -0,0 +1,292 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceClusterID
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuDemand
+import org.opendc.trace.conv.resourceStateCpuReadyPct
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateTimestamp
+import java.io.BufferedReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader {
+ private var state = State.Pending
+
+ override fun nextRow(): Boolean {
+ val state = state
+ if (state == State.Closed) {
+ return false
+ } else if (state == State.Pending) {
+ this.state = State.Active
+ }
+
+ reset()
+
+ var line: String?
+ var num = 0
+
+ while (true) {
+ line = reader.readLine()
+
+ if (line == null) {
+ this.state = State.Closed
+ return false
+ }
+
+ num++
+
+ if (line[0] == '#' || line.isBlank()) {
+ // Ignore empty lines or comments
+ continue
+ }
+
+ break
+ }
+
+ line = line!!.trim()
+
+ val length = line.length
+ var col = 0
+ var start: Int
+ var end = 0
+
+ while (end < length) {
+ // Trim all whitespace before the field
+ start = end
+ while (start < length && line[start].isWhitespace()) {
+ start++
+ }
+
+ end = line.indexOf(' ', start)
+
+ if (end < 0) {
+ end = length
+ }
+
+ val field = line.subSequence(start, end) as String
+ when (col++) {
+ colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10))
+ colCpuUsage -> cpuUsage = field.toDouble()
+ colCpuDemand -> cpuDemand = field.toDouble()
+ colDiskRead -> diskRead = field.toDouble()
+ colDiskWrite -> diskWrite = field.toDouble()
+ colClusterID -> cluster = field.trim()
+ colNcpus -> cpuCores = field.toInt(10)
+ colCpuReadyPct -> cpuReadyPct = field.toDouble()
+ colPoweredOn -> poweredOn = field.toInt(10) == 1
+ colCpuCapacity -> cpuCapacity = field.toDouble()
+ colID -> id = field.trim()
+ colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
+ }
+ }
+
+ return true
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceClusterID -> colClusterID
+ resourceStateTimestamp -> colTimestamp
+ resourceCpuCount -> colNcpus
+ resourceCpuCapacity -> colCpuCapacity
+ resourceStateCpuUsage -> colCpuUsage
+ resourceStateCpuUsagePct -> colCpuUsagePct
+ resourceStateCpuDemand -> colCpuDemand
+ resourceStateCpuReadyPct -> colCpuReadyPct
+ resourceMemCapacity -> colMemCapacity
+ resourceStateDiskRead -> colDiskRead
+ resourceStateDiskWrite -> colDiskWrite
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0 until colMax) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colPoweredOn -> poweredOn
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getInt(index: Int): Int {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colNcpus -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colCpuCapacity -> cpuCapacity
+ colCpuUsage -> cpuUsage
+ colCpuUsagePct -> cpuUsage / cpuCapacity
+ colCpuReadyPct -> cpuReadyPct
+ colCpuDemand -> cpuDemand
+ colMemCapacity -> memCapacity
+ colDiskRead -> diskRead
+ colDiskWrite -> diskWrite
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colID -> id
+ colClusterID -> cluster
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colTimestamp -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ reset()
+ state = State.Closed
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var cluster: String? = null
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuDemand = Double.NaN
+ private var cpuReadyPct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var poweredOn: Boolean = false
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
+ }
+
+ /**
+ * Default column indices for the extended Bitbrains format.
+ */
+ private val colTimestamp = 0
+ private val colCpuUsage = 1
+ private val colCpuDemand = 2
+ private val colDiskRead = 4
+ private val colDiskWrite = 6
+ private val colClusterID = 10
+ private val colNcpus = 12
+ private val colCpuReadyPct = 13
+ private val colPoweredOn = 14
+ private val colCpuCapacity = 18
+ private val colID = 19
+ private val colMemCapacity = 20
+ private val colCpuUsagePct = 21
+ private val colMax = colCpuUsagePct + 1
+
+ private enum class State {
+ Pending,
+ Active,
+ Closed,
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt
new file mode 100644
index 00000000..6115953f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceClusterID
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuDemand
+import org.opendc.trace.conv.resourceStateCpuReadyPct
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.bufferedReader
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * A format implementation for the extended Bitbrains trace format.
+ */
+public class BitbrainsExTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains-ex"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceClusterID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ TableColumn(resourceStateCpuDemand, TableColumnType.Double),
+ TableColumn(resourceStateCpuReadyPct, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ TableColumn(resourceStateDiskRead, TableColumnType.Double),
+ TableColumn(resourceStateDiskWrite, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ /**
+ * Construct a [TableReader] for reading over all resource state partitions.
+ */
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, partPath) = it.next()
+ return BitbrainsExResourceStateTableReader(partPath.bufferedReader())
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsExCompositeTableReader"
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt
new file mode 100644
index 00000000..e264fccb
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -0,0 +1,365 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateMemUsage
+import org.opendc.trace.conv.resourceStateNetRx
+import org.opendc.trace.conv.resourceStateNetTx
+import org.opendc.trace.conv.resourceStateTimestamp
+import java.text.NumberFormat
+import java.time.Duration
+import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeParseException
+import java.util.Locale
+import java.util.UUID
+
+/**
+ * A [TableReader] for the Bitbrains resource state table.
+ */
+internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ /**
+ * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
+ */
+ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
+
+ /**
+ * The type of timestamps in the trace.
+ */
+ private var timestampType: TimestampType = TimestampType.UNDECIDED
+
+ /**
+ * The [NumberFormat] used to parse doubles containing a comma.
+ */
+ private val nf = NumberFormat.getInstance(Locale.GERMAN)
+
+ /**
+ * A flag to indicate that the trace contains decimals with a comma separator.
+ */
+ private var usesCommaDecimalSeparator = false
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
+ // Reset the row state
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "Timestamp [ms]" -> {
+ timestamp =
+ when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ "CPU cores" -> cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
+ "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
+ "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
+ "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
+ "Memory usage [KB]" -> memUsage = parseSafeDouble()
+ "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
+ "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
+ "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
+ "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
+ }
+ }
+
+ return true
+ }
+
+ private val colTimestamp = 0
+ private val colCpuCount = 1
+ private val colCpuCapacity = 2
+ private val colCpuUsage = 3
+ private val colCpuUsagePct = 4
+ private val colMemCapacity = 5
+ private val colMemUsage = 6
+ private val colDiskRead = 7
+ private val colDiskWrite = 8
+ private val colNetRx = 9
+ private val colNetTx = 10
+ private val colID = 11
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceStateCpuUsage -> colCpuUsage
+ resourceStateCpuUsagePct -> colCpuUsagePct
+ resourceMemCapacity -> colMemCapacity
+ resourceStateMemUsage -> colMemUsage
+ resourceStateDiskRead -> colDiskRead
+ resourceStateDiskWrite -> colDiskWrite
+ resourceStateNetRx -> colNetRx
+ resourceStateNetTx -> colNetTx
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colID) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ checkActive()
+ return when (index) {
+ colCpuCount -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ checkActive()
+ return when (index) {
+ colCpuCapacity -> cpuCapacity
+ colCpuUsage -> cpuUsage
+ colCpuUsagePct -> cpuUsagePct
+ colMemCapacity -> memCapacity
+ colMemUsage -> memUsage
+ colDiskRead -> diskRead
+ colDiskWrite -> diskWrite
+ colNetRx -> netReceived
+ colNetTx -> netTransmitted
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String {
+ checkActive()
+ return when (index) {
+ colID -> partition
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ colTimestamp -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * Try to parse the current value safely as double.
+ */
+ private fun parseSafeDouble(): Double {
+ if (!usesCommaDecimalSeparator) {
+ try {
+ return parser.doubleValue
+ } catch (e: JsonParseException) {
+ usesCommaDecimalSeparator = true
+ }
+ }
+
+ val text = parser.text
+ if (text.isBlank()) {
+ return 0.0
+ }
+
+ return nf.parse(text).toDouble()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuUsagePct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var memUsage = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var netReceived = Double.NaN
+ private var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+
+ /**
+ * The type of the timestamp in the trace.
+ */
+ private enum class TimestampType {
+ UNDECIDED,
+ DATE_TIME,
+ EPOCH_MILLIS,
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
+ .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(';')
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt
new file mode 100644
index 00000000..a12785f0
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceID
+import java.nio.file.Path
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] for the Bitbrains resource table.
+ */
+internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader {
+ /**
+ * An iterator to iterate over the resource entries.
+ */
+ private val it = vms.iterator()
+
+ /**
+ * The state of the reader.
+ */
+ private var state = State.Pending
+
+ override fun nextRow(): Boolean {
+ if (state == State.Pending) {
+ state = State.Active
+ }
+
+ reset()
+
+ while (it.hasNext()) {
+ val (name, path) = it.next()
+
+ val parser = factory.createParser(path.toFile())
+ val reader = BitbrainsResourceStateTableReader(name, parser)
+ val idCol = reader.resolve(resourceID)
+
+ try {
+ if (!reader.nextRow()) {
+ continue
+ }
+
+ id = reader.getString(idCol)
+ return true
+ } finally {
+ reader.close()
+ }
+ }
+
+ state = State.Closed
+ return false
+ }
+
+ private val colID = 0
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colID) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reset()
+ state = State.Closed
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ }
+
+ private enum class State {
+ Pending,
+ Active,
+ Closed,
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt
new file mode 100644
index 00000000..23853077
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_RESOURCES
+import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateMemUsage
+import org.opendc.trace.conv.resourceStateNetRx
+import org.opendc.trace.conv.resourceStateNetTx
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.CompositeTableReader
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * A format implementation for the GWF trace format.
+ */
+public class BitbrainsTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "bitbrains"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ ),
+ )
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ TableColumn(resourceStateMemUsage, TableColumnType.Double),
+ TableColumn(resourceStateDiskRead, TableColumnType.Double),
+ TableColumn(resourceStateDiskWrite, TableColumnType.Double),
+ TableColumn(resourceStateNetRx, TableColumnType.Double),
+ TableColumn(resourceStateNetTx, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val vms =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ BitbrainsResourceTableReader(factory, vms)
+ }
+ TABLE_RESOURCE_STATES -> newResourceStateReader(path)
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ /**
+ * Construct a [TableReader] for reading over all resource state partitions.
+ */
+ private fun newResourceStateReader(path: Path): TableReader {
+ val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+ val it = partitions.iterator()
+
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
+ return if (it.hasNext()) {
+ val (partition, partPath) = it.next()
+ return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "BitbrainsCompositeTableReader"
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt
new file mode 100644
index 00000000..8a2a99cb
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.TASK_ALLOC_NCPUS
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.util.convertTo
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+import java.util.regex.Pattern
+
+/**
+ * A [TableReader] implementation for the GWF format.
+ */
+internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
+ // Reset the row state
+ reset()
+
+ if (parser.isClosed || !nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "WorkflowID" -> workflowId = parser.text
+ "JobID" -> jobId = parser.text
+ "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue)
+ "RunTime" -> runtime = Duration.ofSeconds(parser.longValue)
+ "NProcs" -> nProcs = parser.intValue
+ "ReqNProcs" -> reqNProcs = parser.intValue
+ "Dependencies" -> dependencies = parseParents(parser.valueAsString)
+ }
+ }
+
+ return true
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> colJobID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_RUNTIME -> colRuntime
+ TASK_ALLOC_NCPUS -> colNproc
+ TASK_REQ_NCPUS -> colReqNproc
+ TASK_PARENTS -> colDeps
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colDeps) { "Invalid column" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ checkActive()
+ return when (index) {
+ colReqNproc -> reqNProcs
+ colNproc -> nProcs
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ colJobID -> jobId
+ colWorkflowID -> workflowId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ colSubmitTime -> submitTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ checkActive()
+ return when (index) {
+ colRuntime -> runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ checkActive()
+ return when (index) {
+ colDeps -> typeDeps.convertTo(dependencies, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * The pattern used to parse the parents.
+ */
+ private val pattern = Pattern.compile("\\s+")
+
+ /**
+ * Parse the parents into a set of longs.
+ */
+ private fun parseParents(value: String): Set<String> {
+ val result = mutableSetOf<String>()
+ val deps = value.split(pattern)
+
+ for (dep in deps) {
+ if (dep.isBlank()) {
+ continue
+ }
+
+ result.add(dep)
+ }
+
+ return result
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * Reader state fields.
+ */
+ private var workflowId: String? = null
+ private var jobId: String? = null
+ private var submitTime: Instant? = null
+ private var runtime: Duration? = null
+ private var nProcs = -1
+ private var reqNProcs = -1
+ private var dependencies = emptySet<String>()
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ workflowId = null
+ jobId = null
+ submitTime = null
+ runtime = null
+ nProcs = -1
+ reqNProcs = -1
+ dependencies = emptySet()
+ }
+
+ private val colWorkflowID = 0
+ private val colJobID = 1
+ private val colSubmitTime = 2
+ private val colRuntime = 3
+ private val colNproc = 4
+ private val colReqNproc = 5
+ private val colDeps = 6
+
+ private val typeDeps = TableColumnType.Set(TableColumnType.String)
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("JobID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("RunTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("NProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Dependencies", CsvSchema.ColumnType.STRING)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(',')
+ .build()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt
new file mode 100644
index 00000000..097c5593
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.gwf
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_ALLOC_NCPUS
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation for the GWF trace format.
+ */
+public class GwfTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "gwf"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt
new file mode 100644
index 00000000..dba971d7
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -0,0 +1,225 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
+import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
+import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import org.opendc.trace.util.convertTo
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the OpenDC VM interference JSON format.
+ */
+internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
+ override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+
+ parser.nextToken()
+
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
+ }
+ }
+
+ return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) {
+ parser.close()
+ reset()
+ false
+ } else {
+ parseGroup(parser)
+ true
+ }
+ }
+
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
+
+ private val typeMembers = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ return when (index) {
+ colMembers, colTarget, colScore -> false
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDouble(index: Int): Double {
+ checkActive()
+ return when (index) {
+ colTarget -> targetLoad
+ colScore -> score
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun getString(index: Int): String? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ checkActive()
+ return when (index) {
+ colMembers -> typeMembers.convertTo(members, elementType)
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ private var members = emptySet<String>()
+ private var targetLoad = Double.POSITIVE_INFINITY
+ private var score = 1.0
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ members = emptySet()
+ targetLoad = Double.POSITIVE_INFINITY
+ score = 1.0
+ }
+
+ /**
+ * Parse a group an interference JSON file.
+ */
+ private fun parseGroup(parser: JsonParser) {
+ var targetLoad = Double.POSITIVE_INFINITY
+ var score = 1.0
+ val members = mutableSetOf<String>()
+
+ if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "vms" -> parseGroupMembers(parser, members)
+ "minServerLoad" -> targetLoad = parser.doubleValue
+ "performanceScore" -> score = parser.doubleValue
+ }
+ }
+
+ this.members = members
+ this.targetLoad = targetLoad
+ this.score = score
+ }
+
+ /**
+ * Parse the members of a group.
+ */
+ private fun parseGroupMembers(
+ parser: JsonParser,
+ members: MutableSet<String>,
+ ) {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_ARRAY) {
+ if (parser.currentToken() != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected string value for group member")
+ }
+
+ members.add(parser.text)
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt
new file mode 100644
index 00000000..b3286a1c
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import com.fasterxml.jackson.core.JsonGenerator
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
+import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
+import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableWriter] implementation for the OpenDC VM interference JSON format.
+ */
+internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter {
+ /**
+ * A flag to indicate whether a row has been started.
+ */
+ private var isRowActive = false
+
+ init {
+ generator.writeStartArray()
+ }
+
+ override fun startRow() {
+ // Reset state
+ members = emptySet()
+ targetLoad = Double.POSITIVE_INFINITY
+ score = 1.0
+
+ // Mark row as active
+ isRowActive = true
+ }
+
+ override fun endRow() {
+ check(isRowActive) { "No active row" }
+
+ generator.writeStartObject()
+ generator.writeArrayFieldStart("vms")
+ for (member in members) {
+ generator.writeString(member)
+ }
+ generator.writeEndArray()
+ generator.writeNumberField("minServerLoad", targetLoad)
+ generator.writeNumberField("performanceScore", score)
+ generator.writeEndObject()
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
+ else -> -1
+ }
+ }
+
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(isRowActive) { "No active row" }
+
+ when (index) {
+ colTarget -> targetLoad = (value as Number).toDouble()
+ colScore -> score = (value as Number).toDouble()
+ else -> throw IllegalArgumentException("Invalid column $index")
+ }
+ }
+
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
+ check(isRowActive) { "No active row" }
+
+ @Suppress("UNCHECKED_CAST")
+ when (index) {
+ colMembers -> members = value as Set<String>
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun flush() {
+ generator.flush()
+ }
+
+ override fun close() {
+ generator.writeEndArray()
+ generator.close()
+ }
+
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
+
+ private var members = emptySet<String>()
+ private var targetLoad = Double.POSITIVE_INFINITY
+ private var score = 1.0
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt
new file mode 100644
index 00000000..39475f9f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.formats.opendc.parquet.ResourceState
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: ResourceState? = null
+
+ override fun nextRow(): Boolean {
+ try {
+ val record = reader.read()
+ this.record = record
+
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
+ }
+ }
+
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colCpuUsage) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getInt(index: Int): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colCpuCount -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getDouble(index: Int): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colCpuUsage -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colID -> record.id
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colTimestamp -> record.timestamp
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colDuration -> record.duration
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "OdcVmResourceStateTableReader"
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt
new file mode 100644
index 00000000..1421d77c
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.formats.opendc.parquet.ResourceState
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
+ /**
+ * The current state for the record that is being written.
+ */
+ private var localIsActive = false
+ private var localID: String = ""
+ private var localTimestamp: Instant = Instant.MIN
+ private var localDuration: Duration = Duration.ZERO
+ private var localCpuCount: Int = 0
+ private var localCpuUsage: Double = Double.NaN
+
+ override fun startRow() {
+ localIsActive = true
+ localID = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = Double.NaN
+ }
+
+ override fun endRow() {
+ check(localIsActive) { "No active row" }
+ localIsActive = false
+
+ check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+
+ writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage))
+
+ lastId = localID
+ lastTimestamp = localTimestamp
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
+ else -> -1
+ }
+ }
+
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colCpuCount -> localCpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colCpuUsage -> localCpuUsage = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
+
+ when (index) {
+ colID -> localID = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
+
+ when (index) {
+ colTimestamp -> localTimestamp = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
+ check(localIsActive) { "No active row" }
+
+ when (index) {
+ colDuration -> localDuration = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ /**
+ * Last column values that are used to check for correct partitioning.
+ */
+ private var lastId: String? = null
+ private var lastTimestamp: Instant = Instant.MAX
+
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
new file mode 100644
index 00000000..34197d7f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
+import org.opendc.trace.formats.opendc.parquet.Resource
+import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: Resource? = null
+
+ override fun nextRow(): Boolean {
+ try {
+ val record = reader.read()
+ this.record = record
+
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
+ }
+ }
+
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colMemCapacity) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colCpuCount -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colCpuCapacity -> record.cpuCapacity
+ colMemCapacity -> record.memCapacity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colID -> record.id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colStartTime -> record.startTime
+ colStopTime -> record.stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ override fun toString(): String = "OdcVmResourceTableReader"
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
new file mode 100644
index 00000000..e0a11368
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
+import org.opendc.trace.formats.opendc.parquet.Resource
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableWriter] implementation for the OpenDC virtual machine trace format.
+ */
+internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
+ /**
+ * The current state for the record that is being written.
+ */
+ private var localIsActive = false
+ private var localId: String = ""
+ private var localStartTime: Instant = Instant.MIN
+ private var localStopTime: Instant = Instant.MIN
+ private var localCpuCount: Int = 0
+ private var localCpuCapacity: Double = Double.NaN
+ private var localMemCapacity: Double = Double.NaN
+
+ override fun startRow() {
+ localIsActive = true
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = Double.NaN
+ localMemCapacity = Double.NaN
+ }
+
+ override fun endRow() {
+ check(localIsActive) { "No active row" }
+ localIsActive = false
+ writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity))
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
+ else -> -1
+ }
+ }
+
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colCpuCount -> localCpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colCpuCapacity -> localCpuCapacity = value
+ colMemCapacity -> localMemCapacity = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colID -> localId = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
+ when (index) {
+ colStartTime -> localStartTime = value
+ colStopTime -> localStopTime = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun flush() {
+ // Not available
+ }
+
+ override fun close() {
+ writer.close()
+ }
+
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
new file mode 100644
index 00000000..7a7bc834
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc
+
+import com.fasterxml.jackson.core.JsonEncoding
+import com.fasterxml.jackson.core.JsonFactory
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
+import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
+import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
+import org.opendc.trace.conv.TABLE_RESOURCES
+import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
+import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport
+import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport
+import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport
+import org.opendc.trace.formats.opendc.parquet.ResourceWriteSupport
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.nio.file.Files
+import java.nio.file.Path
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation of the OpenDC virtual machine trace format.
+ */
+public class OdcVmTraceFormat : TraceFormat {
+ /**
+ * A [JsonFactory] that is used to parse the JSON-based interference model.
+ */
+ private val jsonFactory = JsonFactory()
+
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "opendc-vm"
+
+ override fun create(path: Path) {
+ // Construct directory containing the trace files
+ Files.createDirectories(path)
+
+ val tables = getTables(path)
+
+ for (table in tables) {
+ val writer = newWriter(path, table)
+ writer.close()
+ }
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStartTime, TableColumnType.Instant),
+ TableColumn(resourceStopTime, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ ),
+ )
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceStateDuration, TableColumnType.Duration),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ ),
+ )
+ TABLE_INTERFERENCE_GROUPS ->
+ TableDetails(
+ listOf(
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
+ OdcVmResourceTableReader(reader)
+ }
+ TABLE_RESOURCE_STATES -> {
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
+ OdcVmResourceStateTableReader(reader)
+ }
+ TABLE_INTERFERENCE_GROUPS -> {
+ val modelPath = path.resolve("interference-model.json")
+ val parser =
+ if (modelPath.exists()) {
+ jsonFactory.createParser(modelPath.toFile())
+ } else {
+ jsonFactory.createParser("[]") // If model does not exist, return empty model
+ }
+
+ OdcVmInterferenceJsonTableReader(parser)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ return when (table) {
+ TABLE_RESOURCES -> {
+ val writer =
+ LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceTableWriter(writer)
+ }
+ TABLE_RESOURCE_STATES -> {
+ val writer =
+ LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
+ OdcVmResourceStateTableWriter(writer)
+ }
+ TABLE_INTERFERENCE_GROUPS -> {
+ val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
+ OdcVmInterferenceJsonTableWriter(generator)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
new file mode 100644
index 00000000..e8efe60f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import java.time.Instant
+
+/**
+ * A description of a resource in a trace.
+ */
+internal data class Resource(
+ val id: String,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Double,
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
new file mode 100644
index 00000000..75238344
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
+
+/**
+ * A [ReadSupport] instance for [Resource] objects.
+ */
+internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "submissionTime" to resourceStartTime,
+ "start_time" to resourceStartTime,
+ "endTime" to resourceStopTime,
+ "stop_time" to resourceStopTime,
+ "maxCores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpu_capacity" to resourceCpuCapacity,
+ "requiredMemory" to resourceMemCapacity,
+ "mem_capacity" to resourceMemCapacity,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType =
+ READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
new file mode 100644
index 00000000..2e32c2e2
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Resource] records.
+ */
+internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+ /**
+ * State of current record being read.
+ */
+ private var localId = ""
+ private var localStartTime = Instant.MIN
+ private var localStopTime = Instant.MIN
+ private var localCpuCount = 0
+ private var localCpuCapacity = 0.0
+ private var localMemCapacity = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStartTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_capacity" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localMemCapacity = value
+ }
+
+ override fun addLong(value: Long) {
+ localMemCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = 0.0
+ localMemCapacity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Resource =
+ Resource(
+ localId,
+ localStartTime,
+ localStopTime,
+ localCpuCount,
+ localCpuCapacity,
+ localMemCapacity,
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt
new file mode 100644
index 00000000..64ab9dca
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+internal class ResourceState(
+ val id: String,
+ val timestamp: Instant,
+ val duration: Duration,
+ val cpuCount: Int,
+ val cpuUsage: Double,
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
new file mode 100644
index 00000000..e7d35630
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+
+/**
+ * A [ReadSupport] instance for [ResourceState] objects.
+ */
+internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "time" to resourceStateTimestamp,
+ "timestamp" to resourceStateTimestamp,
+ "duration" to resourceStateDuration,
+ "cores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpuUsage" to resourceStateCpuUsage,
+ "cpu_usage" to resourceStateCpuUsage,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage"),
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
new file mode 100644
index 00000000..8ff0e476
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [ResourceState] records.
+ */
+internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+ /**
+ * State of current record being read.
+ */
+ private var localId = ""
+ private var localTimestamp = Instant.MIN
+ private var localDuration = Duration.ZERO
+ private var localCpuCount = 0
+ private var localCpuUsage = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localTimestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localDuration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuUsage = value
+ }
+ }
+ "flops" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ localId = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt
new file mode 100644
index 00000000..58c43916
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ResourceState) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(
+ consumer: RecordConsumer,
+ record: ResourceState,
+ ) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("timestamp", 1)
+ consumer.addLong(record.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 1)
+
+ consumer.startField("duration", 2)
+ consumer.addLong(record.duration.toMillis())
+ consumer.endField("duration", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_usage", 4)
+ consumer.addDouble(record.cpuUsage)
+ consumer.endField("cpu_usage", 4)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
new file mode 100644
index 00000000..a9937ffd
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.formats.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import kotlin.math.roundToLong
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceWriteSupport : WriteSupport<Resource>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Resource) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(
+ consumer: RecordConsumer,
+ record: Resource,
+ ) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("start_time", 1)
+ consumer.addLong(record.startTime.toEpochMilli())
+ consumer.endField("start_time", 1)
+
+ consumer.startField("stop_time", 2)
+ consumer.addLong(record.stopTime.toEpochMilli())
+ consumer.endField("stop_time", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_capacity", 4)
+ consumer.addDouble(record.cpuCapacity)
+ consumer.endField("cpu_capacity", 4)
+
+ consumer.startField("mem_capacity", 5)
+ consumer.addLong(record.memCapacity.roundToLong())
+ consumer.endField("mem_capacity", 5)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt
new file mode 100644
index 00000000..5a79fd6f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt
@@ -0,0 +1,236 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.TASK_ALLOC_NCPUS
+import org.opendc.trace.conv.TASK_GROUP_ID
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_STATUS
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_USER_ID
+import org.opendc.trace.conv.TASK_WAIT_TIME
+import java.io.BufferedReader
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the SWF format.
+ */
+internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader {
+ /**
+ * A flag to indicate the state of the reader
+ */
+ private var state = State.Pending
+
+ /**
+ * The current row.
+ */
+ private var fields = emptyList<String>()
+
+ /**
+ * A [Regex] object to match whitespace.
+ */
+ private val whitespace = "\\s+".toRegex()
+
+ override fun nextRow(): Boolean {
+ var line: String?
+ var num = 0
+
+ val state = state
+ if (state == State.Closed) {
+ return false
+ } else if (state == State.Pending) {
+ this.state = State.Active
+ }
+
+ while (true) {
+ line = reader.readLine()
+
+ if (line == null) {
+ this.state = State.Closed
+ return false
+ }
+ num++
+
+ if (line.isBlank()) {
+ // Ignore empty lines
+ continue
+ } else if (line.startsWith(";")) {
+ // Ignore comments for now
+ continue
+ }
+
+ break
+ }
+
+ fields = line!!.trim().split(whitespace)
+
+ if (fields.size < 18) {
+ throw IllegalArgumentException("Invalid format at line $line")
+ }
+
+ return true
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> colJobID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_WAIT_TIME -> colWaitTime
+ TASK_RUNTIME -> colRunTime
+ TASK_ALLOC_NCPUS -> colAllocNcpus
+ TASK_REQ_NCPUS -> colReqNcpus
+ TASK_STATUS -> colStatus
+ TASK_USER_ID -> colUserID
+ TASK_GROUP_ID -> colGroupID
+ TASK_PARENTS -> colParentJob
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in colJobID..colParentThinkTime) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colJobID -> fields[index]
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10))
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10))
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ check(state == State.Active) { "No active row" }
+ @Suppress("UNCHECKED_CAST")
+ return when (index) {
+ colParentJob -> {
+ require(elementType.isAssignableFrom(String::class.java))
+ val parent = fields[index].toLong(10)
+ if (parent < 0) emptySet() else setOf(parent)
+ }
+ else -> throw IllegalArgumentException("Invalid column")
+ } as Set<T>?
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ state = State.Closed
+ }
+
+ /**
+ * Default column indices for the SWF format.
+ */
+ private val colJobID = 0
+ private val colSubmitTime = 1
+ private val colWaitTime = 2
+ private val colRunTime = 3
+ private val colAllocNcpus = 4
+ private val colAvgCpuTime = 5
+ private val colUsedMem = 6
+ private val colReqNcpus = 7
+ private val colReqTime = 8
+ private val colReqMem = 9
+ private val colStatus = 10
+ private val colUserID = 11
+ private val colGroupID = 12
+ private val colExecNum = 13
+ private val colQueueNum = 14
+ private val colPartNum = 15
+ private val colParentJob = 16
+ private val colParentThinkTime = 17
+
+ private enum class State {
+ Pending,
+ Active,
+ Closed,
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt
new file mode 100644
index 00000000..d59b07b4
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.swf
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_ALLOC_NCPUS
+import org.opendc.trace.conv.TASK_GROUP_ID
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_STATUS
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_USER_ID
+import org.opendc.trace.conv.TASK_WAIT_TIME
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import java.nio.file.Path
+import kotlin.io.path.bufferedReader
+
+/**
+ * Support for the Standard Workload Format (SWF) in OpenDC.
+ *
+ * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html
+ */
+public class SwfTraceFormat : TraceFormat {
+ override val name: String = "swf"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_STATUS, TableColumnType.Int),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt
new file mode 100644
index 00000000..8f84e51f
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt
@@ -0,0 +1,314 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.util.convertTo
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+import kotlin.math.roundToInt
+
+/**
+ * A [TableReader] implementation for the WfCommons workload trace format.
+ */
+internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader {
+ /**
+ * The current nesting of the parser.
+ */
+ private var level: ParserLevel = ParserLevel.TOP
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ var hasJob = false
+
+ while (!hasJob) {
+ when (level) {
+ ParserLevel.TOP -> {
+ val token = parser.nextToken()
+
+ // Check whether the document is not empty and starts with an object
+ if (token == null) {
+ parser.close()
+ break
+ } else if (token != JsonToken.START_OBJECT) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.TRACE
+ }
+ }
+ ParserLevel.TRACE -> {
+ // Seek for the workflow object in the file
+ if (!seekWorkflow()) {
+ parser.close()
+ break
+ } else if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.WORKFLOW
+ }
+ }
+ ParserLevel.WORKFLOW -> {
+ // Seek for the jobs object in the file
+ level =
+ if (!seekJobs()) {
+ ParserLevel.TRACE
+ } else if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ } else {
+ ParserLevel.JOB
+ }
+ }
+ ParserLevel.JOB -> {
+ when (parser.nextToken()) {
+ JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW
+ JsonToken.START_OBJECT -> {
+ parseJob()
+ hasJob = true
+ break
+ }
+ else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation)
+ }
+ }
+ }
+ }
+
+ return hasJob
+ }
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> colID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_RUNTIME -> colRuntime
+ TASK_REQ_NCPUS -> colNproc
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..colChildren) { "Invalid column value" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ checkActive()
+ return when (index) {
+ colNproc -> cores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ colID -> id
+ colWorkflowID -> workflowId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ checkActive()
+ return when (index) {
+ colRuntime -> runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ checkActive()
+ return when (index) {
+ colParents -> typeParents.convertTo(parents, elementType)
+ colChildren -> typeChildren.convertTo(children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
+ * Parse the trace and seek until the workflow description.
+ */
+ private fun seekWorkflow(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) {
+ when (parser.currentName) {
+ "name" -> workflowId = parser.text
+ "workflow" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse the workflow description in the file and seek until the first job.
+ */
+ private fun seekJobs(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "jobs" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse a single job in the file.
+ */
+ private fun parseJob() {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "name" -> id = parser.text
+ "parents" -> parents = parseIds()
+ "children" -> children = parseIds()
+ "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong())
+ "cores" -> cores = parser.floatValue.roundToInt()
+ else -> parser.skipChildren()
+ }
+ }
+ }
+
+ /**
+ * Parse the parents/children of a job.
+ */
+ private fun parseIds(): Set<String> {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ }
+
+ val ids = mutableSetOf<String>()
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ if (parser.currentToken != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected token", parser.currentLocation)
+ }
+
+ ids.add(parser.valueAsString)
+ }
+
+ return ids
+ }
+
+ private enum class ParserLevel {
+ TOP,
+ TRACE,
+ WORKFLOW,
+ JOB,
+ }
+
+ /**
+ * State fields for the parser.
+ */
+ private var id: String? = null
+ private var workflowId: String? = null
+ private var runtime: Duration? = null
+ private var parents: Set<String>? = null
+ private var children: Set<String>? = null
+ private var cores = -1
+
+ private fun reset() {
+ id = null
+ runtime = null
+ parents = null
+ children = null
+ cores = -1
+ }
+
+ private val colID = 0
+ private val colWorkflowID = 1
+ private val colRuntime = 3
+ private val colNproc = 4
+ private val colParents = 5
+ private val colChildren = 6
+
+ private val typeParents = TableColumnType.Set(TableColumnType.String)
+ private val typeChildren = TableColumnType.Set(TableColumnType.String)
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt
new file mode 100644
index 00000000..2178fac6
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation for the WfCommons workload trace format.
+ */
+public class WfFormatTraceFormat : TraceFormat {
+ /**
+ * The [JsonFactory] that is used to created JSON parsers.
+ */
+ private val factory = JsonFactory()
+
+ override val name: String = "wfformat"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt
new file mode 100644
index 00000000..95582388
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_GROUP_ID
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_USER_ID
+import org.opendc.trace.conv.TASK_WAIT_TIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.util.convertTo
+import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.wtf.parquet.Task
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * A [TableReader] implementation for the WTF format.
+ */
+internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
+ /**
+ * The current record.
+ */
+ private var record: Task? = null
+
+ override fun nextRow(): Boolean {
+ try {
+ val record = reader.read()
+ this.record = record
+
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
+ }
+ }
+
+ private val colID = 0
+ private val colWorkflowID = 1
+ private val colSubmitTime = 2
+ private val colWaitTime = 3
+ private val colRuntime = 4
+ private val colReqNcpus = 5
+ private val colParents = 6
+ private val colChildren = 7
+ private val colGroupID = 8
+ private val colUserID = 9
+
+ private val typeParents = TableColumnType.Set(TableColumnType.String)
+ private val typeChildren = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> colID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_WAIT_TIME -> colWaitTime
+ TASK_RUNTIME -> colRuntime
+ TASK_REQ_NCPUS -> colReqNcpus
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ TASK_GROUP_ID -> colGroupID
+ TASK_USER_ID -> colUserID
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean {
+ require(index in colID..colUserID) { "Invalid column index" }
+ return false
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colReqNcpus -> record.requestedCpus
+ colGroupID -> record.groupId
+ colUserID -> record.userId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colID -> record.id
+ colWorkflowID -> record.workflowId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colSubmitTime -> record.submitTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colWaitTime -> record.waitTime
+ colRuntime -> record.runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colParents -> typeParents.convertTo(record.parents, elementType)
+ colChildren -> typeChildren.convertTo(record.children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt
new file mode 100644
index 00000000..1386d2ef
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import org.opendc.trace.conv.TABLE_TASKS
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_GROUP_ID
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_USER_ID
+import org.opendc.trace.conv.TASK_WAIT_TIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+import org.opendc.trace.spi.TableDetails
+import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.wtf.parquet.TaskReadSupport
+import java.nio.file.Path
+
+/**
+ * A [TraceFormat] implementation for the Workflow Trace Format (WTF).
+ */
+public class WtfTraceFormat : TraceFormat {
+ override val name: String = "wtf"
+
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
+ return when (table) {
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int),
+ ),
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
+ return when (table) {
+ TABLE_TASKS -> {
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
+ WtfTaskTableReader(reader)
+ }
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt
new file mode 100644
index 00000000..a1db0cab
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class Task(
+ val id: String,
+ val workflowId: String,
+ val submitTime: Instant,
+ val waitTime: Duration,
+ val runtime: Duration,
+ val requestedCpus: Int,
+ val groupId: Int,
+ val userId: Int,
+ val parents: Set<String>,
+ val children: Set<String>,
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt
new file mode 100644
index 00000000..1f9c506d
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
+import org.opendc.trace.conv.TASK_CHILDREN
+import org.opendc.trace.conv.TASK_GROUP_ID
+import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.conv.TASK_PARENTS
+import org.opendc.trace.conv.TASK_REQ_NCPUS
+import org.opendc.trace.conv.TASK_RUNTIME
+import org.opendc.trace.conv.TASK_SUBMIT_TIME
+import org.opendc.trace.conv.TASK_USER_ID
+import org.opendc.trace.conv.TASK_WAIT_TIME
+import org.opendc.trace.conv.TASK_WORKFLOW_ID
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
+ */
+internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap =
+ mapOf(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id",
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt
new file mode 100644
index 00000000..412a4f8b
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+import kotlin.math.roundToInt
+import kotlin.math.roundToLong
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
+ /**
+ * State of current record being read.
+ */
+ private var localID = ""
+ private var localWorkflowID = ""
+ private var localSubmitTime = Instant.MIN
+ private var localWaitTime = Duration.ZERO
+ private var localRuntime = Duration.ZERO
+ private var localRequestedCpus = 0
+ private var localGroupId = 0
+ private var localUserId = 0
+ private var localParents = mutableSetOf<String>()
+ private var localChildren = mutableSetOf<String>()
+
+ /**
+ * Root converter for the record.
+ */
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localID = value.toString()
+ }
+ }
+ "workflow_id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWorkflowID = value.toString()
+ }
+ }
+ "ts_submit" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localSubmitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWaitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localRuntime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localRequestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localGroupId = value
+ }
+ }
+ "user_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localUserId = value
+ }
+ }
+ "children" -> RelationConverter(localChildren)
+ "parents" -> RelationConverter(localParents)
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ localID = ""
+ localWorkflowID = ""
+ localSubmitTime = Instant.MIN
+ localWaitTime = Duration.ZERO
+ localRuntime = Duration.ZERO
+ localRequestedCpus = 0
+ localGroupId = 0
+ localUserId = 0
+ localParents.clear()
+ localChildren.clear()
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Task =
+ Task(
+ localID,
+ localWorkflowID,
+ localSubmitTime,
+ localWaitTime,
+ localRuntime,
+ localRequestedCpus,
+ localGroupId,
+ localUserId,
+ localParents.toSet(),
+ localChildren.toSet(),
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+
+ /**
+ * Helper class to convert parent and child relations and add them to [relations].
+ */
+ private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
+ private val entryConverter =
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
+
+ override fun addDouble(value: Double) {
+ relations.add(value.roundToLong().toString())
+ }
+ }
+
+ private val listConverter =
+ object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
+
+ override fun start() {}
+
+ override fun end() {}
+ }
+
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return listConverter
+ }
+
+ override fun start() {}
+
+ override fun end() {}
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 83537822..89cac608 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -24,6 +24,13 @@ package org.opendc.trace.spi
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
+import org.opendc.trace.azure.AzureTraceFormat
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
+import org.opendc.trace.formats.opendc.OdcVmTraceFormat
+import org.opendc.trace.gwf.GwfTraceFormat
+import org.opendc.trace.swf.SwfTraceFormat
+import org.opendc.trace.wfformat.WfFormatTraceFormat
+import org.opendc.trace.wtf.WtfTraceFormat
import java.nio.file.Path
import java.util.ServiceLoader
@@ -107,13 +114,31 @@ public interface TraceFormat {
return ServiceLoader.load(TraceFormat::class.java)
}
+// /**
+// * Obtain a [TraceFormat] implementation by [name].
+// */
+// @JvmStatic
+// public fun byName(name: String): TraceFormat? {
+//
+// val loader = ServiceLoader.load(TraceFormat::class.java)
+// return loader.find { it.name == name }
+// }
+
/**
* Obtain a [TraceFormat] implementation by [name].
*/
@JvmStatic
public fun byName(name: String): TraceFormat? {
- val loader = ServiceLoader.load(TraceFormat::class.java)
- return loader.find { it.name == name }
+ return when (name) {
+ "opendc-vm" -> OdcVmTraceFormat()
+ "azure" -> AzureTraceFormat()
+ "bitbrains" -> BitbrainsTraceFormat()
+ "gwf" -> GwfTraceFormat()
+ "swf" -> SwfTraceFormat()
+ "wfformat" -> WfFormatTraceFormat()
+ "wtf" -> WtfTraceFormat()
+ else -> null
+ }
}
}
}