summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-04-16 09:29:53 +0200
committerGitHub <noreply@github.com>2024-04-16 09:29:53 +0200
commitfff89d25bd3c7b874e68261d21695c473c30ed7d (patch)
treebe368dd745e8119dbdefd9cd0b012c7ff9080a7a /opendc-trace/opendc-trace-wtf/src/main
parenta7b0afbb5b7059274962ade234a50240677008fd (diff)
Revamped the trace system. All TraceFormat files are now in the api m… (#216)
* Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces * applied spotless
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src/main')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt187
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt102
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt148
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt188
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
6 files changed, 0 insertions, 668 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
deleted file mode 100644
index 95582388..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf
-
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.util.convertTo
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.wtf.parquet.Task
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-
-/**
- * A [TableReader] implementation for the WTF format.
- */
-internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
- /**
- * The current record.
- */
- private var record: Task? = null
-
- override fun nextRow(): Boolean {
- try {
- val record = reader.read()
- this.record = record
-
- return record != null
- } catch (e: Throwable) {
- this.record = null
- throw e
- }
- }
-
- private val colID = 0
- private val colWorkflowID = 1
- private val colSubmitTime = 2
- private val colWaitTime = 3
- private val colRuntime = 4
- private val colReqNcpus = 5
- private val colParents = 6
- private val colChildren = 7
- private val colGroupID = 8
- private val colUserID = 9
-
- private val typeParents = TableColumnType.Set(TableColumnType.String)
- private val typeChildren = TableColumnType.Set(TableColumnType.String)
-
- override fun resolve(name: String): Int {
- return when (name) {
- TASK_ID -> colID
- TASK_WORKFLOW_ID -> colWorkflowID
- TASK_SUBMIT_TIME -> colSubmitTime
- TASK_WAIT_TIME -> colWaitTime
- TASK_RUNTIME -> colRuntime
- TASK_REQ_NCPUS -> colReqNcpus
- TASK_PARENTS -> colParents
- TASK_CHILDREN -> colChildren
- TASK_GROUP_ID -> colGroupID
- TASK_USER_ID -> colUserID
- else -> -1
- }
- }
-
- override fun isNull(index: Int): Boolean {
- require(index in colID..colUserID) { "Invalid column index" }
- return false
- }
-
- override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInt(index: Int): Int {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- colReqNcpus -> record.requestedCpus
- colGroupID -> record.groupId
- colUserID -> record.userId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getFloat(index: Int): Float {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getDouble(index: Int): Double {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getString(index: Int): String {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colID -> record.id
- colWorkflowID -> record.workflowId
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getUUID(index: Int): UUID? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun getInstant(index: Int): Instant {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colSubmitTime -> record.submitTime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun getDuration(index: Int): Duration {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colWaitTime -> record.waitTime
- colRuntime -> record.runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <T> getList(
- index: Int,
- elementType: Class<T>,
- ): List<T>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun <T> getSet(
- index: Int,
- elementType: Class<T>,
- ): Set<T>? {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- colParents -> typeParents.convertTo(record.parents, elementType)
- colChildren -> typeChildren.convertTo(record.children, elementType)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
- override fun <K, V> getMap(
- index: Int,
- keyType: Class<K>,
- valueType: Class<V>,
- ): Map<K, V>? {
- throw IllegalArgumentException("Invalid column")
- }
-
- override fun close() {
- reader.close()
- }
-}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
deleted file mode 100644
index 1386d2ef..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf
-
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.trace.spi.TableDetails
-import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.wtf.parquet.TaskReadSupport
-import java.nio.file.Path
-
-/**
- * A [TraceFormat] implementation for the Workflow Trace Format (WTF).
- */
-public class WtfTraceFormat : TraceFormat {
- override val name: String = "wtf"
-
- override fun create(path: Path) {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-
- override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
-
- override fun getDetails(
- path: Path,
- table: String,
- ): TableDetails {
- return when (table) {
- TABLE_TASKS ->
- TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int),
- ),
- )
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newReader(
- path: Path,
- table: String,
- projection: List<String>?,
- ): TableReader {
- return when (table) {
- TABLE_TASKS -> {
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
- WtfTaskTableReader(reader)
- }
- else -> throw IllegalArgumentException("Table $table not supported")
- }
- }
-
- override fun newWriter(
- path: Path,
- table: String,
- ): TableWriter {
- throw UnsupportedOperationException("Writing not supported for this format")
- }
-}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
deleted file mode 100644
index a1db0cab..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A task in the Workflow Trace Format.
- */
-internal data class Task(
- val id: String,
- val workflowId: String,
- val submitTime: Instant,
- val waitTime: Duration,
- val runtime: Duration,
- val requestedCpus: Int,
- val groupId: Int,
- val userId: Int,
- val parents: Set<String>,
- val children: Set<String>,
-)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
deleted file mode 100644
index 1f9c506d..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.InitContext
-import org.apache.parquet.hadoop.api.ReadSupport
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Type
-import org.apache.parquet.schema.Types
-import org.opendc.trace.conv.TASK_CHILDREN
-import org.opendc.trace.conv.TASK_GROUP_ID
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_USER_ID
-import org.opendc.trace.conv.TASK_WAIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-
-/**
- * A [ReadSupport] instance for [Task] objects.
- *
- * @param projection The projection of the table to read.
- */
-internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
- /**
- * Mapping of table columns to their Parquet column names.
- */
- private val colMap =
- mapOf(
- TASK_ID to "id",
- TASK_WORKFLOW_ID to "workflow_id",
- TASK_SUBMIT_TIME to "ts_submit",
- TASK_WAIT_TIME to "wait_time",
- TASK_RUNTIME to "runtime",
- TASK_REQ_NCPUS to "resource_amount_requested",
- TASK_PARENTS to "parents",
- TASK_CHILDREN to "children",
- TASK_GROUP_ID to "group_id",
- TASK_USER_ID to "user_id",
- )
-
- override fun init(context: InitContext): ReadContext {
- val projectedSchema =
- if (projection != null) {
- Types.buildMessage()
- .apply {
- val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
-
- for (col in projection) {
- val fieldName = colMap[col] ?: continue
- addField(fieldByName.getValue(fieldName))
- }
- }
- .named(READ_SCHEMA.name)
- } else {
- READ_SCHEMA
- }
- return ReadContext(projectedSchema)
- }
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext,
- ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
-
- companion object {
- /**
- * Parquet read schema for the "tasks" table in the trace.
- */
- @JvmStatic
- val READ_SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("workflow_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("ts_submit"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("wait_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("runtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("resource_amount_requested"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("user_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("group_id"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("children"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list"),
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("parents"),
- )
- .named("task")
- }
-}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
deleted file mode 100644
index 412a4f8b..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.wtf.parquet
-
-import org.apache.parquet.io.api.Converter
-import org.apache.parquet.io.api.GroupConverter
-import org.apache.parquet.io.api.PrimitiveConverter
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-import java.time.Duration
-import java.time.Instant
-import kotlin.math.roundToInt
-import kotlin.math.roundToLong
-
-/**
- * A [RecordMaterializer] for [Task] records.
- */
-internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
- /**
- * State of current record being read.
- */
- private var localID = ""
- private var localWorkflowID = ""
- private var localSubmitTime = Instant.MIN
- private var localWaitTime = Duration.ZERO
- private var localRuntime = Duration.ZERO
- private var localRequestedCpus = 0
- private var localGroupId = 0
- private var localUserId = 0
- private var localParents = mutableSetOf<String>()
- private var localChildren = mutableSetOf<String>()
-
- /**
- * Root converter for the record.
- */
- private val root =
- object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters =
- schema.fields.map { type ->
- when (type.name) {
- "id" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localID = value.toString()
- }
- }
- "workflow_id" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localWorkflowID = value.toString()
- }
- }
- "ts_submit" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localSubmitTime = Instant.ofEpochMilli(value)
- }
- }
- "wait_time" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localWaitTime = Duration.ofMillis(value)
- }
- }
- "runtime" ->
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- localRuntime = Duration.ofMillis(value)
- }
- }
- "resource_amount_requested" ->
- object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- localRequestedCpus = value.roundToInt()
- }
- }
- "group_id" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localGroupId = value
- }
- }
- "user_id" ->
- object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- localUserId = value
- }
- }
- "children" -> RelationConverter(localChildren)
- "parents" -> RelationConverter(localParents)
- else -> error("Unknown column $type")
- }
- }
-
- override fun start() {
- localID = ""
- localWorkflowID = ""
- localSubmitTime = Instant.MIN
- localWaitTime = Duration.ZERO
- localRuntime = Duration.ZERO
- localRequestedCpus = 0
- localGroupId = 0
- localUserId = 0
- localParents.clear()
- localChildren.clear()
- }
-
- override fun end() {}
-
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
-
- override fun getCurrentRecord(): Task =
- Task(
- localID,
- localWorkflowID,
- localSubmitTime,
- localWaitTime,
- localRuntime,
- localRequestedCpus,
- localGroupId,
- localUserId,
- localParents.toSet(),
- localChildren.toSet(),
- )
-
- override fun getRootConverter(): GroupConverter = root
-
- /**
- * Helper class to convert parent and child relations and add them to [relations].
- */
- private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
- private val entryConverter =
- object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- relations.add(value.toString())
- }
-
- override fun addDouble(value: Double) {
- relations.add(value.roundToLong().toString())
- }
- }
-
- private val listConverter =
- object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return entryConverter
- }
-
- override fun start() {}
-
- override fun end() {}
- }
-
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return listConverter
- }
-
- override fun start() {}
-
- override fun end() {}
- }
-}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
deleted file mode 100644
index 32da52ff..00000000
--- a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
+++ /dev/null
@@ -1 +0,0 @@
-org.opendc.trace.wtf.WtfTraceFormat