diff options
Diffstat (limited to 'opendc-trace/opendc-trace-wtf')
12 files changed, 0 insertions, 847 deletions
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts deleted file mode 100644 index a3119e5e..00000000 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Support for Workflow Trace Format (WTF) traces in OpenDC" - -// Build configuration -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcTrace.opendcTraceApi) - - implementation(projects.opendcTrace.opendcTraceParquet) - - testImplementation(projects.opendcTrace.opendcTraceTestkit) - testRuntimeOnly(libs.slf4j.simple) -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt deleted file mode 100644 index 95582388..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.util.convertTo -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.Task -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the WTF format. - */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { - /** - * The current record. - */ - private var record: Task? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colWorkflowID = 1 - private val colSubmitTime = 2 - private val colWaitTime = 3 - private val colRuntime = 4 - private val colReqNcpus = 5 - private val colParents = 6 - private val colChildren = 7 - private val colGroupID = 8 - private val colUserID = 9 - - private val typeParents = TableColumnType.Set(TableColumnType.String) - private val typeChildren = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - TASK_ID -> colID - TASK_WORKFLOW_ID -> colWorkflowID - TASK_SUBMIT_TIME -> colSubmitTime - TASK_WAIT_TIME -> colWaitTime - TASK_RUNTIME -> colRuntime - TASK_REQ_NCPUS -> colReqNcpus - TASK_PARENTS -> colParents - TASK_CHILDREN -> colChildren - TASK_GROUP_ID -> colGroupID - TASK_USER_ID -> colUserID - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in colID..colUserID) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colReqNcpus -> record.requestedCpus - colGroupID -> record.groupId - colUserID -> record.userId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - throw IllegalArgumentException("Invalid column") - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colID -> record.id - colWorkflowID -> record.workflowId - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colSubmitTime -> record.submitTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colWaitTime -> record.waitTime - colRuntime -> record.runtime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colParents -> typeParents.convertTo(record.parents, elementType) - colChildren -> typeChildren.convertTo(record.children, elementType) - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt deleted file mode 100644 index 1386d2ef..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.wtf.parquet.TaskReadSupport -import java.nio.file.Path - -/** - * A [TraceFormat] implementation for the Workflow Trace Format (WTF). - */ -public class WtfTraceFormat : TraceFormat { - override val name: String = "wtf" - - override fun create(path: Path) { - throw UnsupportedOperationException("Writing not supported for this format") - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_TASKS -> - TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) - WtfTaskTableReader(reader) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - throw UnsupportedOperationException("Writing not supported for this format") - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt deleted file mode 100644 index a1db0cab..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import java.time.Duration -import java.time.Instant - -/** - * A task in the Workflow Trace Format. - */ -internal data class Task( - val id: String, - val workflowId: String, - val submitTime: Instant, - val waitTime: Duration, - val runtime: Duration, - val requestedCpus: Int, - val groupId: Int, - val userId: Int, - val parents: Set<String>, - val children: Set<String>, -) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt deleted file mode 100644 index 1f9c506d..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Type -import org.apache.parquet.schema.Types -import org.opendc.trace.conv.TASK_CHILDREN -import org.opendc.trace.conv.TASK_GROUP_ID -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_USER_ID -import org.opendc.trace.conv.TASK_WAIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID - -/** - * A [ReadSupport] instance for [Task] objects. - * - * @param projection The projection of the table to read. - */ -internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { - /** - * Mapping of table columns to their Parquet column names. - */ - private val colMap = - mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id", - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val fieldByName = READ_SCHEMA.fields.associateBy { it.name } - - for (col in projection) { - val fieldName = colMap[col] ?: continue - addField(fieldByName.getValue(fieldName)) - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema for the "tasks" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list"), - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents"), - ) - .named("task") - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt deleted file mode 100644 index 412a4f8b..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf.parquet - -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant -import kotlin.math.roundToInt -import kotlin.math.roundToLong - -/** - * A [RecordMaterializer] for [Task] records. - */ -internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { - /** - * State of current record being read. - */ - private var localID = "" - private var localWorkflowID = "" - private var localSubmitTime = Instant.MIN - private var localWaitTime = Duration.ZERO - private var localRuntime = Duration.ZERO - private var localRequestedCpus = 0 - private var localGroupId = 0 - private var localUserId = 0 - private var localParents = mutableSetOf<String>() - private var localChildren = mutableSetOf<String>() - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localID = value.toString() - } - } - "workflow_id" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWorkflowID = value.toString() - } - } - "ts_submit" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localSubmitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localWaitTime = Duration.ofMillis(value) - } - } - "runtime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localRuntime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localRequestedCpus = value.roundToInt() - } - } - "group_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localGroupId = value - } - } - "user_id" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localUserId = value - } - } - "children" -> RelationConverter(localChildren) - "parents" -> RelationConverter(localParents) - else -> error("Unknown column $type") - } - } - - override fun start() { - localID = "" - localWorkflowID = "" - localSubmitTime = Instant.MIN - localWaitTime = Duration.ZERO - localRuntime = Duration.ZERO - localRequestedCpus = 0 - localGroupId = 0 - localUserId = 0 - localParents.clear() - localChildren.clear() - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Task = - Task( - localID, - localWorkflowID, - localSubmitTime, - localWaitTime, - localRuntime, - localRequestedCpus, - localGroupId, - localUserId, - localParents.toSet(), - localChildren.toSet(), - ) - - override fun getRootConverter(): GroupConverter = root - - /** - * Helper class to convert parent and child relations and add them to [relations]. - */ - private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { - private val entryConverter = - object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } - - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) - } - } - - private val listConverter = - object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } - - override fun start() {} - - override fun end() {} - } - - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return listConverter - } - - override fun start() {} - - override fun end() {} - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat deleted file mode 100644 index 32da52ff..00000000 --- a/opendc-trace/opendc-trace-wtf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat +++ /dev/null @@ -1 +0,0 @@ -org.opendc.trace.wtf.WtfTraceFormat diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt deleted file mode 100644 index ad49cce0..00000000 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.wtf - -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertDoesNotThrow -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Nested -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.trace.TableColumn -import org.opendc.trace.TableReader -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.trace.testkit.TableReaderTestKit -import java.nio.file.Paths -import java.time.Duration -import java.time.Instant - -/** - * Test suite for the [WtfTraceFormat] class. - */ -@DisplayName("WTF TraceFormat") -class WtfTraceFormatTest { - private val format = WtfTraceFormat() - - @Test - fun testTables() { - val path = Paths.get("src/test/resources/wtf-trace") - assertEquals(listOf(TABLE_TASKS), format.getTables(path)) - } - - @Test - fun testTableExists() { - val path = Paths.get("src/test/resources/wtf-trace") - assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } - } - - @Test - fun testTableDoesNotExist() { - val path = Paths.get("src/test/resources/wtf-trace") - - assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } - } - - /** - * Smoke test for parsing WTF traces. - */ - @Test - fun testTableReader() { - val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) - - assertAll( - { assertTrue(reader.nextRow()) }, - { assertEquals("362334516345962206", reader.getString(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) }, - { - assertEquals( - setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java), - ) - }, - ) - - assertAll( - { assertTrue(reader.nextRow()) }, - { assertEquals("502010169100446658", reader.getString(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) }, - { - assertEquals( - setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java), - ) - }, - ) - - reader.close() - } - - @DisplayName("TableReader for Tasks") - @Nested - inner class TasksTableReaderTest : TableReaderTestKit() { - override lateinit var reader: TableReader - override lateinit var columns: List<TableColumn> - - @BeforeEach - fun setUp() { - val path = Paths.get("src/test/resources/wtf-trace") - - columns = format.getDetails(path, TABLE_TASKS).columns - reader = format.newReader(path, TABLE_TASKS, null) - } - } - - @DisplayName("TableReader for Tasks (Shell trace)") - @Nested - inner class ShellTasksTableReaderTest : TableReaderTestKit() { - override lateinit var reader: TableReader - override lateinit var columns: List<TableColumn> - - @BeforeEach - fun setUp() { - val path = Paths.get("src/test/resources/shell") - - columns = format.getDetails(path, TABLE_TASKS).columns - reader = format.newReader(path, TABLE_TASKS, null) - } - } -} diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet Binary files differdeleted file mode 100755 index 31256990..00000000 --- a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet +++ /dev/null diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet Binary files differdeleted file mode 100755 index 872469d5..00000000 --- a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet +++ /dev/null diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json deleted file mode 100755 index 5949ab59..00000000 --- a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json +++ /dev/null @@ -1 +0,0 @@ -{"total_workflows": 3403, "total_tasks": 10208, "domain": "Industrial", "date_start": null, "date_end": null, "num_sites": 3403, "num_resources": 10208.0, "num_users": 1, "num_groups": 1, "total_resource_seconds": 89229.863, "authors": ["Shenjun Ma", "Alexey Ilyushkin", "Alexander Stegehuis", "Alexandru Iosup"], "min_resource_task": 1.0, "max_resource_task": 1.0, "std_resource_task": 0.0, "mean_resource_task": 1.0, "median_resource_task": 1.0, "first_quartile_resource_task": 1.0, "third_quartile_resource_task": 1.0, "cov_resource_task": 0.0, "min_memory": -1, "max_memory": -1, "std_memory": 0.0, "mean_memory": -1.0, "median_memory": -1, "first_quartile_memory": -1, "third_quartile_memory": -1, "cov_memory": -0.0, "min_network_usage": -1, "max_network_usage": -1, "std_network_usage": 0.0, "mean_network_usage": -1.0, "median_network_usage": -1, "first_quartile_network_usage": -1, "third_quartile_network_usage": -1, "cov_network_usage": -0.0, "min_disk_space_usage": -1, "max_disk_space_usage": -1, "std_disk_space_usage": 0.0, "mean_disk_space_usage": -1.0, "median_disk_space_usage": -1, "first_quartile_disk_space_usage": -1, "third_quartile_disk_space_usage": -1, "cov_disk_space_usage": -0.0, "min_energy": -1, "max_energy": -1, "std_energy": 0.0, "mean_energy": -1.0, "median_energy": -1, "first_quartile_energy": -1, "third_quartile_energy": -1, "cov_energy": -0.0, "workload_description": "Chronos is a trace from Shell's Chronos IoT production system. It contains pipelines where sensor data is obtained, checked if values are within range (e.g. temperature, operational status, etc.), and the outcomes are written to persistent storage."}
\ No newline at end of file diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet Binary files differdeleted file mode 100644 index d2044038..00000000 --- a/opendc-trace/opendc-trace-wtf/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet +++ /dev/null |
