From 5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 5 Mar 2024 13:23:57 +0100 Subject: Updated package versions, updated web server tests. (#207) * Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19 --- .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 84 ++++---- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 43 ++-- .../kotlin/org/opendc/trace/wtf/parquet/Task.kt | 2 +- .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 122 ++++++------ .../trace/wtf/parquet/TaskRecordMaterializer.kt | 217 +++++++++++---------- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 8 +- 6 files changed, 257 insertions(+), 219 deletions(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 73c1b8a9..95582388 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -62,38 +62,38 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 - - private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) - private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + private val colID = 0 + private val colWorkflowID = 1 + private val colSubmitTime = 2 + private val colWaitTime = 3 + private val colRuntime = 4 + private val colReqNcpus = 5 + private val colParents = 6 + private val colChildren = 7 + private val colGroupID = 8 + private val colUserID = 9 + + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) override fun resolve(name: String): Int { return when (name) { - TASK_ID -> COL_ID - TASK_WORKFLOW_ID -> COL_WORKFLOW_ID - TASK_SUBMIT_TIME -> COL_SUBMIT_TIME - TASK_WAIT_TIME -> COL_WAIT_TIME - TASK_RUNTIME -> COL_RUNTIME - TASK_REQ_NCPUS -> COL_REQ_NCPUS - TASK_PARENTS -> COL_PARENTS - TASK_CHILDREN -> COL_CHILDREN - TASK_GROUP_ID -> COL_GROUP_ID - TASK_USER_ID -> COL_USER_ID + TASK_ID -> colID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_WAIT_TIME -> colWaitTime + TASK_RUNTIME -> colRuntime + TASK_REQ_NCPUS -> colReqNcpus + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_GROUP_ID -> colGroupID + TASK_USER_ID -> colUserID else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in COL_ID..COL_USER_ID) { "Invalid column index" } + require(index in colID..colUserID) { "Invalid column index" } return false } @@ -105,9 +105,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_REQ_NCPUS -> record.requestedCpus - COL_GROUP_ID -> record.groupId - COL_USER_ID -> record.userId + colReqNcpus -> record.requestedCpus + colGroupID -> record.groupId + colUserID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -127,8 +127,8 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) override fun getString(index: Int): String { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId + colID -> record.id + colWorkflowID -> record.workflowId else -> throw IllegalArgumentException("Invalid column") } } @@ -140,7 +140,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) override fun getInstant(index: Int): Instant { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_SUBMIT_TIME -> record.submitTime + colSubmitTime -> record.submitTime else -> throw IllegalArgumentException("Invalid column") } } @@ -148,26 +148,36 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) override fun getDuration(index: Int): Duration { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime + colWaitTime -> record.waitTime + colRuntime -> record.runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun getList(index: Int, elementType: Class): List? { + override fun getList( + index: Int, + elementType: Class, + ): List? { throw IllegalArgumentException("Invalid column") } - override fun getSet(index: Int, elementType: Class): Set? { + override fun getSet( + index: Int, + elementType: Class, + ): Set? { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) - COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) else -> throw IllegalArgumentException("Invalid column") } } - override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index c25b512c..1386d2ef 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -55,27 +55,35 @@ public class WtfTraceFormat : TraceFormat { override fun getTables(path: Path): List = listOf(TABLE_TASKS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_TASKS -> TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int) + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) @@ -85,7 +93,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt index 71557f96..a1db0cab 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -38,5 +38,5 @@ internal data class Task( val groupId: Int, val userId: Int, val parents: Set, - val children: Set + val children: Set, ) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 33be38d4..1f9c506d 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -51,18 +51,19 @@ internal class TaskReadSupport(private val projection: List?) : ReadSupp /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id" - ) + private val colMap = + mapOf( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id", + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -87,7 +88,7 @@ internal class TaskReadSupport(private val projection: List?) : ReadSupp configuration: Configuration, keyValueMetaData: Map, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) companion object { @@ -95,52 +96,53 @@ internal class TaskReadSupport(private val projection: List?) : ReadSupp * Parquet read schema for the "tasks" table in the trace. */ @JvmStatic - val READ_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list") - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list") - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents") - ) - .named("task") + val READ_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt index 055be0c3..412a4f8b 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -39,102 +39,113 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< /** * State of current record being read. */ - private var _id = "" - private var _workflowId = "" - private var _submitTime = Instant.MIN - private var _waitTime = Duration.ZERO - private var _runtime = Duration.ZERO - private var _requestedCpus = 0 - private var _groupId = 0 - private var _userId = 0 - private var _parents = mutableSetOf() - private var _children = mutableSetOf() + private var localID = "" + private var localWorkflowID = "" + private var localSubmitTime = Instant.MIN + private var localWaitTime = Duration.ZERO + private var localRuntime = Duration.ZERO + private var localRequestedCpus = 0 + private var localGroupId = 0 + private var localUserId = 0 + private var localParents = mutableSetOf() + private var localChildren = mutableSetOf() /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _id = value.toString() + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localID = value.toString() + } + } + "workflow_id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWorkflowID = value.toString() + } + } + "ts_submit" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localSubmitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWaitTime = Duration.ofMillis(value) + } + } + "runtime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localRuntime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localRequestedCpus = value.roundToInt() + } + } + "group_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localGroupId = value + } + } + "user_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localUserId = value + } + } + "children" -> RelationConverter(localChildren) + "parents" -> RelationConverter(localParents) + else -> error("Unknown column $type") } } - "workflow_id" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _workflowId = value.toString() - } - } - "ts_submit" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _submitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _waitTime = Duration.ofMillis(value) - } - } - "runtime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _runtime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _requestedCpus = value.roundToInt() - } - } - "group_id" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _groupId = value - } - } - "user_id" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _userId = value - } - } - "children" -> RelationConverter(_children) - "parents" -> RelationConverter(_parents) - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _workflowId = "" - _submitTime = Instant.MIN - _waitTime = Duration.ZERO - _runtime = Duration.ZERO - _requestedCpus = 0 - _groupId = 0 - _userId = 0 - _parents.clear() - _children.clear() - } + override fun start() { + localID = "" + localWorkflowID = "" + localSubmitTime = Instant.MIN + localWaitTime = Duration.ZERO + localRuntime = Duration.ZERO + localRequestedCpus = 0 + localGroupId = 0 + localUserId = 0 + localParents.clear() + localChildren.clear() + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): Task = Task( - _id, - _workflowId, - _submitTime, - _waitTime, - _runtime, - _requestedCpus, - _groupId, - _userId, - _parents.toSet(), - _children.toSet() - ) + override fun getCurrentRecord(): Task = + Task( + localID, + localWorkflowID, + localSubmitTime, + localWaitTime, + localRuntime, + localRequestedCpus, + localGroupId, + localUserId, + localParents.toSet(), + localChildren.toSet(), + ) override fun getRootConverter(): GroupConverter = root @@ -142,25 +153,28 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< * Helper class to convert parent and child relations and add them to [relations]. */ private class RelationConverter(private val relations: MutableSet) : GroupConverter() { - private val entryConverter = object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } + private val entryConverter = + object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) + override fun addDouble(value: Double) { + relations.add(value.roundToLong().toString()) + } } - } - private val listConverter = object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } + private val listConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } - override fun start() {} - override fun end() {} - } + override fun start() {} + + override fun end() {} + } override fun getConverter(fieldIndex: Int): Converter { require(fieldIndex == 0) @@ -168,6 +182,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< } override fun start() {} + override fun end() {} } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0457098c..ad49cce0 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -87,9 +87,9 @@ class WtfTraceFormatTest { { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java) + reader.getSet(TASK_PARENTS, String::class.java), ) - } + }, ) assertAll( @@ -101,9 +101,9 @@ class WtfTraceFormatTest { { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java) + reader.getSet(TASK_PARENTS, String::class.java), ) - } + }, ) reader.close() -- cgit v1.2.3