summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 11:06:31 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:03 +0200
commitae0b12987dca93c05e44341963511ac8cf802793 (patch)
tree625eb6d27e34ac2cac27bf8b93381c1c5c0426c0 /opendc-trace/opendc-trace-wtf
parenta1374a63f81fafc5da565072bae2ecae2e0fed28 (diff)
refactor(trace/wtf): Do not use Avro when reading WTF trace
This change updates the Workflow Trace format implementation in OpenDC to not use the `parquet-avro` library for exporting experiment data, but instead to use the low-level APIs to directly read the data from Parquet. This reduces the amount of conversions necessary before reaching the OpenDC trace API.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt81
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt5
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt99
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt165
5 files changed, 329 insertions, 63 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 1e332aca..f0db78b7 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.wtf
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
+import org.opendc.trace.wtf.parquet.Task
/**
* A [TableReader] implementation for the WTF format.
*/
-internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Task? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
return when (index) {
- COL_ID -> (record[AVRO_COL_ID] as Long).toString()
- COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
- COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
- COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
- COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_ID -> record.id
+ COL_WORKFLOW_ID -> record.workflowId
+ COL_SUBMIT_TIME -> record.submitTime
+ COL_WAIT_TIME -> record.waitTime
+ COL_RUNTIME -> record.runtime
COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_PARENTS -> record.parents
+ COL_CHILDREN -> record.children
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
- COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
- COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
+ COL_REQ_NCPUS -> record.requestedCpus
+ COL_GROUP_ID -> record.groupId
+ COL_USER_ID -> record.userId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
reader.close()
}
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
- AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
- AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
- AVRO_COL_RUNTIME = schema.getField("runtime").pos()
- AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
- AVRO_COL_PARENTS = schema.getField("parents").pos()
- AVRO_COL_CHILDREN = schema.getField("children").pos()
- AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
- AVRO_COL_USER_ID = schema.getField("user_id").pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_WORKFLOW_ID = -1
- private var AVRO_COL_SUBMIT_TIME = -1
- private var AVRO_COL_WAIT_TIME = -1
- private var AVRO_COL_RUNTIME = -1
- private var AVRO_COL_REQ_NCPUS = -1
- private var AVRO_COL_PARENTS = -1
- private var AVRO_COL_CHILDREN = -1
- private var AVRO_COL_GROUP_ID = -1
- private var AVRO_COL_USER_ID = -1
-
private val COL_ID = 0
private val COL_WORKFLOW_ID = 1
private val COL_SUBMIT_TIME = 2
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index c8f9ecaa..aae71c58 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -22,12 +22,12 @@
package org.opendc.trace.wtf
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.wtf.parquet.TaskReadSupport
import java.nio.file.Path
/**
@@ -66,7 +66,8 @@ public class WtfTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
+ val factory = LocalParquetReader.custom(TaskReadSupport())
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory)
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
new file mode 100644
index 00000000..71557f96
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class Task(
+ val id: String,
+ val workflowId: String,
+ val submitTime: Instant,
+ val waitTime: Duration,
+ val runtime: Duration,
+ val requestedCpus: Int,
+ val groupId: Int,
+ val userId: Int,
+ val parents: Set<String>,
+ val children: Set<String>
+)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
new file mode 100644
index 00000000..0017a4a9
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ */
+internal class TaskReadSupport : ReadSupport<Task>() {
+ override fun init(context: InitContext): ReadContext {
+ return ReadContext(READ_SCHEMA)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
new file mode 100644
index 00000000..08da5eaf
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+import kotlin.math.roundToInt
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _workflowId = ""
+ private var _submitTime = Instant.MIN
+ private var _waitTime = Duration.ZERO
+ private var _runtime = Duration.ZERO
+ private var _requestedCpus = 0
+ private var _groupId = 0
+ private var _userId = 0
+ private var _parents = mutableSetOf<String>()
+ private var _children = mutableSetOf<String>()
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _id = value.toString()
+ }
+ }
+ "workflow_id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _workflowId = value.toString()
+ }
+ }
+ "ts_submit" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _submitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _waitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _runtime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _requestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _groupId = value
+ }
+ }
+ "user_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _userId = value
+ }
+ }
+ "children" -> RelationConverter(_children)
+ "parents" -> RelationConverter(_parents)
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _workflowId = ""
+ _submitTime = Instant.MIN
+ _waitTime = Duration.ZERO
+ _runtime = Duration.ZERO
+ _requestedCpus = 0
+ _groupId = 0
+ _userId = 0
+ _parents.clear()
+ _children.clear()
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Task = Task(
+ _id,
+ _workflowId,
+ _submitTime,
+ _waitTime,
+ _runtime,
+ _requestedCpus,
+ _groupId,
+ _userId,
+ _parents.toSet(),
+ _children.toSet()
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+
+ /**
+ * Helper class to convert parent and child relations and add them to [relations].
+ */
+ private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
+ private val entryConverter = object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
+ }
+
+ private val listConverter = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return listConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+}