diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-04-16 09:29:53 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-16 09:29:53 +0200 |
| commit | fff89d25bd3c7b874e68261d21695c473c30ed7d (patch) | |
| tree | be368dd745e8119dbdefd9cd0b012c7ff9080a7a /opendc-trace/opendc-trace-wtf/src/main | |
| parent | a7b0afbb5b7059274962ade234a50240677008fd (diff) | |
Revamped the trace system. All TraceFormat files are now in the api m… (#216)
* Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces
* applied spotless
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src/main')
6 files changed, 0 insertions, 668 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt deleted file mode 100644 index 95582388..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.Task -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the WTF format. - */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { - /** - * The current record. - */ - private var record: Task? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colSubmitTime = 2 - private val colWaitTime = 3 - private val colRuntime = 4 - private val colReqNcpus = 5 - private val colParents = 6 - private val colChildren = 7 - private val colGroupID = 8 - private val colUserID = 9 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colReqNcpus - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - TASK_GROUP_ID -> colGroupID - TASK_USER_ID -> colUserID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colID..colUserID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colReqNcpus -> record.requestedCpus - colGroupID -> record.groupId - colUserID -> record.userId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colID -> record.id - colWorkflowID -> record.workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colSubmitTime -> record.submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colWaitTime -> record.waitTime - colRuntime -> record.runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colParents -> typeParents.convertTo(record.parents, elementType) - colChildren -> typeChildren.convertTo(record.children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt deleted file mode 100644 index 1386d2ef..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.TaskReadSupport -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the Workflow Trace Format (WTF). - */ -public class WtfTraceFormat : TraceFormat { - override val name: String = "wtf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) - WtfTaskTableReader(reader) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt deleted file mode 100644 index a1db0cab..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import java.time.Duration -import java.time.Instant - -/** - * A task in the Workflow Trace Format. - */ -internal data class Task( - val id: String, - val workflowId: String, - val submitTime: Instant, - val waitTime: Duration, - val runtime: Duration, - val requestedCpus: Int, - val groupId: Int, - val userId: Int, - val parents: Set<String>, - val children: Set<String>, -) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt deleted file mode 100644 index 1f9c506d..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID - -/** - * A [ReadSupport] instance for [Task] objects. - * - * @param projection The projection of the table to read. - */ -internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { - /** - * Mapping of table columns to their Parquet column names. - */ - private val colMap = - mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id", - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } - - for (col in projection) { - val fieldName = colMap[col] ?: continue - addField(fieldByName.getValue(fieldName)) - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - ) - .named("task") - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt deleted file mode 100644 index 412a4f8b..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant -import kotlin.math.roundToInt -import kotlin.math.roundToLong - -/** - * A [RecordMaterializer] for [Task] records. - */ -internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { - /** - * State of current record being read. - */ - private var localID = "" - private var localWorkflowID = "" - private var localSubmitTime = Instant.MIN - private var localWaitTime = Duration.ZERO - private var localRuntime = Duration.ZERO - private var localRequestedCpus = 0 - private var localGroupId = 0 - private var localUserId = 0 - private var localParents = mutableSetOf<String>() - private var localChildren = mutableSetOf<String>() - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localID = value.toString() - } - } - "workflow_id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWorkflowID = value.toString() - } - } - "ts_submit" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localSubmitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWaitTime = Duration.ofMillis(value) - } - } - "runtime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localRuntime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localRequestedCpus = value.roundToInt() - } - } - "group_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGroupId = value - } - } - "user_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localUserId = value - } - } - "children" -> RelationConverter(localChildren) - "parents" -> RelationConverter(localParents) - else -> error("Unknown column $type") - } - } - - override fun start() { - localID = "" - localWorkflowID = "" - localSubmitTime = Instant.MIN - localWaitTime = Duration.ZERO - localRuntime = Duration.ZERO - localRequestedCpus = 0 - localGroupId = 0 - localUserId = 0 - localParents.clear() - localChildren.clear() - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Task = - Task( - localID, - localWorkflowID, - localSubmitTime, - localWaitTime, - localRuntime, - localRequestedCpus, - localGroupId, - localUserId, - localParents.toSet(), - localChildren.toSet(), - ) - - override fun getRootConverter(): GroupConverter = root - - /** - * Helper class to convert parent and child relations and add them to [relations]. - */ - private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { - private val entryConverter = - object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } - - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) - } - } - - private val listConverter = - object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } - - override fun start() {} - - override fun end() {} - } - - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return listConverter - } - - override fun start() {} - - override fun end() {} - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat deleted file mode 100644 index 32da52ff..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat +++ /dev/null @@ -1 +0,0 @@ -org.opendc.trace.wtf.WtfTraceFormat |
