summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-trace/opendc-trace-wtf
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-trace/opendc-trace-wtf')
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt84
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt43
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt122
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt217
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt8
7 files changed, 258 insertions, 220 deletions
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
index 599087e1..a3119e5e 100644
--- a/opendc-trace/opendc-trace-wtf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for Workflow Trace Format (WTF) traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 73c1b8a9..95582388 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -62,38 +62,38 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
}
}
- private val COL_ID = 0
- private val COL_WORKFLOW_ID = 1
- private val COL_SUBMIT_TIME = 2
- private val COL_WAIT_TIME = 3
- private val COL_RUNTIME = 4
- private val COL_REQ_NCPUS = 5
- private val COL_PARENTS = 6
- private val COL_CHILDREN = 7
- private val COL_GROUP_ID = 8
- private val COL_USER_ID = 9
-
- private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
- private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
+ private val colID = 0
+ private val colWorkflowID = 1
+ private val colSubmitTime = 2
+ private val colWaitTime = 3
+ private val colRuntime = 4
+ private val colReqNcpus = 5
+ private val colParents = 6
+ private val colChildren = 7
+ private val colGroupID = 8
+ private val colUserID = 9
+
+ private val typeParents = TableColumnType.Set(TableColumnType.String)
+ private val typeChildren = TableColumnType.Set(TableColumnType.String)
override fun resolve(name: String): Int {
return when (name) {
- TASK_ID -> COL_ID
- TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
- TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
- TASK_WAIT_TIME -> COL_WAIT_TIME
- TASK_RUNTIME -> COL_RUNTIME
- TASK_REQ_NCPUS -> COL_REQ_NCPUS
- TASK_PARENTS -> COL_PARENTS
- TASK_CHILDREN -> COL_CHILDREN
- TASK_GROUP_ID -> COL_GROUP_ID
- TASK_USER_ID -> COL_USER_ID
+ TASK_ID -> colID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_WAIT_TIME -> colWaitTime
+ TASK_RUNTIME -> colRuntime
+ TASK_REQ_NCPUS -> colReqNcpus
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ TASK_GROUP_ID -> colGroupID
+ TASK_USER_ID -> colUserID
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in COL_ID..COL_USER_ID) { "Invalid column index" }
+ require(index in colID..colUserID) { "Invalid column index" }
return false
}
@@ -105,9 +105,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_REQ_NCPUS -> record.requestedCpus
- COL_GROUP_ID -> record.groupId
- COL_USER_ID -> record.userId
+ colReqNcpus -> record.requestedCpus
+ colGroupID -> record.groupId
+ colUserID -> record.userId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -127,8 +127,8 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getString(index: Int): String {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
- COL_WORKFLOW_ID -> record.workflowId
+ colID -> record.id
+ colWorkflowID -> record.workflowId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -140,7 +140,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getInstant(index: Int): Instant {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_SUBMIT_TIME -> record.submitTime
+ colSubmitTime -> record.submitTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -148,26 +148,36 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getDuration(index: Int): Duration {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_WAIT_TIME -> record.waitTime
- COL_RUNTIME -> record.runtime
+ colWaitTime -> record.waitTime
+ colRuntime -> record.runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType)
- COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType)
+ colParents -> typeParents.convertTo(record.parents, elementType)
+ colChildren -> typeChildren.convertTo(record.children, elementType)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index c25b512c..1386d2ef 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -55,27 +55,35 @@ public class WtfTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_TASKS -> TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int)
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_TASKS -> {
val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
@@ -85,7 +93,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
index 71557f96..a1db0cab 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
@@ -38,5 +38,5 @@ internal data class Task(
val groupId: Int,
val userId: Int,
val parents: Set<String>,
- val children: Set<String>
+ val children: Set<String>,
)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 33be38d4..1f9c506d 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -51,18 +51,19 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
/**
* Mapping of table columns to their Parquet column names.
*/
- private val colMap = mapOf(
- TASK_ID to "id",
- TASK_WORKFLOW_ID to "workflow_id",
- TASK_SUBMIT_TIME to "ts_submit",
- TASK_WAIT_TIME to "wait_time",
- TASK_RUNTIME to "runtime",
- TASK_REQ_NCPUS to "resource_amount_requested",
- TASK_PARENTS to "parents",
- TASK_CHILDREN to "children",
- TASK_GROUP_ID to "group_id",
- TASK_USER_ID to "user_id"
- )
+ private val colMap =
+ mapOf(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id",
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -87,7 +88,7 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -95,52 +96,53 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
* Parquet read schema for the "tasks" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("workflow_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("ts_submit"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("wait_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("runtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("resource_amount_requested"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("user_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("group_id"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list")
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("children"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list")
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("parents")
- )
- .named("task")
+ val READ_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
index 055be0c3..412a4f8b 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -39,102 +39,113 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
/**
* State of current record being read.
*/
- private var _id = ""
- private var _workflowId = ""
- private var _submitTime = Instant.MIN
- private var _waitTime = Duration.ZERO
- private var _runtime = Duration.ZERO
- private var _requestedCpus = 0
- private var _groupId = 0
- private var _userId = 0
- private var _parents = mutableSetOf<String>()
- private var _children = mutableSetOf<String>()
+ private var localID = ""
+ private var localWorkflowID = ""
+ private var localSubmitTime = Instant.MIN
+ private var localWaitTime = Duration.ZERO
+ private var localRuntime = Duration.ZERO
+ private var localRequestedCpus = 0
+ private var localGroupId = 0
+ private var localUserId = 0
+ private var localParents = mutableSetOf<String>()
+ private var localChildren = mutableSetOf<String>()
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _id = value.toString()
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localID = value.toString()
+ }
+ }
+ "workflow_id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWorkflowID = value.toString()
+ }
+ }
+ "ts_submit" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localSubmitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWaitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localRuntime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localRequestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localGroupId = value
+ }
+ }
+ "user_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localUserId = value
+ }
+ }
+ "children" -> RelationConverter(localChildren)
+ "parents" -> RelationConverter(localParents)
+ else -> error("Unknown column $type")
}
}
- "workflow_id" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _workflowId = value.toString()
- }
- }
- "ts_submit" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _submitTime = Instant.ofEpochMilli(value)
- }
- }
- "wait_time" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _waitTime = Duration.ofMillis(value)
- }
- }
- "runtime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _runtime = Duration.ofMillis(value)
- }
- }
- "resource_amount_requested" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _requestedCpus = value.roundToInt()
- }
- }
- "group_id" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _groupId = value
- }
- }
- "user_id" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _userId = value
- }
- }
- "children" -> RelationConverter(_children)
- "parents" -> RelationConverter(_parents)
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _workflowId = ""
- _submitTime = Instant.MIN
- _waitTime = Duration.ZERO
- _runtime = Duration.ZERO
- _requestedCpus = 0
- _groupId = 0
- _userId = 0
- _parents.clear()
- _children.clear()
- }
+ override fun start() {
+ localID = ""
+ localWorkflowID = ""
+ localSubmitTime = Instant.MIN
+ localWaitTime = Duration.ZERO
+ localRuntime = Duration.ZERO
+ localRequestedCpus = 0
+ localGroupId = 0
+ localUserId = 0
+ localParents.clear()
+ localChildren.clear()
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): Task = Task(
- _id,
- _workflowId,
- _submitTime,
- _waitTime,
- _runtime,
- _requestedCpus,
- _groupId,
- _userId,
- _parents.toSet(),
- _children.toSet()
- )
+ override fun getCurrentRecord(): Task =
+ Task(
+ localID,
+ localWorkflowID,
+ localSubmitTime,
+ localWaitTime,
+ localRuntime,
+ localRequestedCpus,
+ localGroupId,
+ localUserId,
+ localParents.toSet(),
+ localChildren.toSet(),
+ )
override fun getRootConverter(): GroupConverter = root
@@ -142,25 +153,28 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
* Helper class to convert parent and child relations and add them to [relations].
*/
private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
- private val entryConverter = object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- relations.add(value.toString())
- }
+ private val entryConverter =
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
- override fun addDouble(value: Double) {
- relations.add(value.roundToLong().toString())
+ override fun addDouble(value: Double) {
+ relations.add(value.roundToLong().toString())
+ }
}
- }
- private val listConverter = object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return entryConverter
- }
+ private val listConverter =
+ object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
- override fun start() {}
- override fun end() {}
- }
+ override fun start() {}
+
+ override fun end() {}
+ }
override fun getConverter(fieldIndex: Int): Converter {
require(fieldIndex == 0)
@@ -168,6 +182,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
}
override fun start() {}
+
override fun end() {}
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0457098c..ad49cce0 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -87,9 +87,9 @@ class WtfTraceFormatTest {
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.getSet(TASK_PARENTS, String::class.java)
+ reader.getSet(TASK_PARENTS, String::class.java),
)
- }
+ },
)
assertAll(
@@ -101,9 +101,9 @@ class WtfTraceFormatTest {
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.getSet(TASK_PARENTS, String::class.java)
+ reader.getSet(TASK_PARENTS, String::class.java),
)
- }
+ },
)
reader.close()