From cb95cb2a5b24ae62c33962c988e89daf9a1a3e91 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Thu, 2 Oct 2025 16:19:43 +0200 Subject: Changed the input of "nature" to a boolean to safe space and make loading faster. (#375) --- .../kotlin/org/opendc/trace/conv/TaskColumns.kt | 2 +- .../trace/formats/workload/TaskTableReader.kt | 14 +++-- .../trace/formats/workload/TaskTableWriter.kt | 16 ++--- .../trace/formats/workload/WorkloadTraceFormat.kt | 4 +- .../opendc/trace/formats/workload/parquet/Task.kt | 2 +- .../formats/workload/parquet/TaskReadSupport.kt | 4 +- .../workload/parquet/TaskRecordMaterializer.kt | 12 ++-- .../trace/formats/workload/parquet/TaskSchemas.kt | 71 +++++++++++++++++++++- .../formats/workload/parquet/TaskWriteSupport.kt | 9 +-- 9 files changed, 102 insertions(+), 32 deletions(-) (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 0df52c71..5f29af03 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -87,7 +87,7 @@ public const val TASK_CHILDREN: String = "children" /** * Nature of the task. Delayable, interruptible, etc. */ -public const val TASK_NATURE: String = "nature" +public const val TASK_DEFERRABLE: String = "deferrable" /** * Deadline of the task. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt index 6c700b0c..51ab9242 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt @@ -28,13 +28,13 @@ import org.opendc.trace.conv.TASK_CHILDREN import org.opendc.trace.conv.TASK_CPU_CAPACITY import org.opendc.trace.conv.TASK_CPU_COUNT import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DEFERRABLE import org.opendc.trace.conv.TASK_DURATION import org.opendc.trace.conv.TASK_GPU_CAPACITY import org.opendc.trace.conv.TASK_GPU_COUNT import org.opendc.trace.conv.TASK_ID import org.opendc.trace.conv.TASK_MEM_CAPACITY import org.opendc.trace.conv.TASK_NAME -import org.opendc.trace.conv.TASK_NATURE import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME import org.opendc.trace.formats.workload.parquet.Task @@ -76,7 +76,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T private val colGpuCount = 8 private val colParents = 9 private val colChildren = 10 - private val colNature = 11 + private val colDeferrable = 11 private val colDeadline = 12 private val typeParents = TableColumnType.Set(TableColumnType.Int) @@ -95,7 +95,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T TASK_GPU_CAPACITY -> colGpuCapacity TASK_PARENTS -> colParents TASK_CHILDREN -> colChildren - TASK_NATURE -> colNature + TASK_DEFERRABLE -> colDeferrable TASK_DEADLINE -> colDeadline else -> -1 } @@ -106,14 +106,17 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - colNature -> record.nature == null colDeadline -> record.deadline == -1L else -> false } } override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colDeferrable -> record.deferrable + else -> throw IllegalArgumentException("Invalid column") + } } override fun getInt(index: Int): Int { @@ -156,7 +159,6 @@ internal class TaskTableReader(private val reader: LocalParquetReader) : T return when (index) { colName -> record.name - colNature -> record.nature else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt index 39be36c1..5e57fd84 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt @@ -28,13 +28,13 @@ import org.opendc.trace.conv.TASK_CHILDREN import org.opendc.trace.conv.TASK_CPU_CAPACITY import org.opendc.trace.conv.TASK_CPU_COUNT import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DEFERRABLE import org.opendc.trace.conv.TASK_DURATION import org.opendc.trace.conv.TASK_GPU_CAPACITY import org.opendc.trace.conv.TASK_GPU_COUNT import org.opendc.trace.conv.TASK_ID import org.opendc.trace.conv.TASK_MEM_CAPACITY import org.opendc.trace.conv.TASK_NAME -import org.opendc.trace.conv.TASK_NATURE import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME import org.opendc.trace.formats.workload.parquet.Task @@ -61,7 +61,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW private var localGpuCapacity: Double = Double.NaN private var localParents = mutableSetOf() private var localChildren = mutableSetOf() - private var localNature: String? = null + private var localDeferrable: Boolean = false private var localDeadline: Long = -1 override fun startRow() { @@ -77,7 +77,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW localGpuCapacity = Double.NaN localParents.clear() localChildren.clear() - localNature = null + localDeferrable = false localDeadline = -1L } @@ -97,7 +97,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW localGpuCapacity, localParents, localChildren, - localNature, + localDeferrable, localDeadline, ), ) @@ -116,7 +116,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW TASK_GPU_CAPACITY -> colGpuCapacity TASK_PARENTS -> colParents TASK_CHILDREN -> colChildren - TASK_NATURE -> colNature + TASK_DEFERRABLE -> colNature TASK_DEADLINE -> colDeadline else -> -1 } @@ -126,7 +126,10 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW index: Int, value: Boolean, ) { - throw IllegalArgumentException("Invalid column or type [index $index]") + when (index) { + colNature -> localDeferrable = value + else -> throw IllegalArgumentException("Invalid column index $index") + } } override fun setInt( @@ -181,7 +184,6 @@ internal class TaskTableWriter(private val writer: ParquetWriter) : TableW check(localIsActive) { "No active row" } when (index) { colName -> localName = value - colNature -> localNature = value else -> throw IllegalArgumentException("Invalid column index $index") } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt index 7af0650e..0f0ba0b8 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt @@ -37,12 +37,12 @@ import org.opendc.trace.conv.TASK_CHILDREN import org.opendc.trace.conv.TASK_CPU_CAPACITY import org.opendc.trace.conv.TASK_CPU_COUNT import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DEFERRABLE import org.opendc.trace.conv.TASK_DURATION import org.opendc.trace.conv.TASK_GPU_CAPACITY import org.opendc.trace.conv.TASK_GPU_COUNT import org.opendc.trace.conv.TASK_ID import org.opendc.trace.conv.TASK_MEM_CAPACITY -import org.opendc.trace.conv.TASK_NATURE import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME import org.opendc.trace.formats.workload.parquet.FragmentReadSupport @@ -97,7 +97,7 @@ public class WorkloadTraceFormat : TraceFormat { TableColumn(TASK_GPU_CAPACITY, TableColumnType.Double), TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_NATURE, TableColumnType.String), + TableColumn(TASK_DEFERRABLE, TableColumnType.Boolean), TableColumn(TASK_DEADLINE, TableColumnType.Long), ), ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt index 0ebac5eb..ccc44bde 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt @@ -39,6 +39,6 @@ internal data class Task( val gpuCapacity: Double = 0.0, val parents: MutableSet = mutableSetOf(), val children: Set = emptySet(), - val nature: String? = null, + val deferrable: Boolean = false, val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt index 4bbb18ac..5b743fbe 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt @@ -33,13 +33,13 @@ import org.opendc.trace.conv.TASK_CHILDREN import org.opendc.trace.conv.TASK_CPU_CAPACITY import org.opendc.trace.conv.TASK_CPU_COUNT import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DEFERRABLE import org.opendc.trace.conv.TASK_DURATION import org.opendc.trace.conv.TASK_GPU_CAPACITY import org.opendc.trace.conv.TASK_GPU_COUNT import org.opendc.trace.conv.TASK_ID import org.opendc.trace.conv.TASK_MEM_CAPACITY import org.opendc.trace.conv.TASK_NAME -import org.opendc.trace.conv.TASK_NATURE import org.opendc.trace.conv.TASK_PARENTS import org.opendc.trace.conv.TASK_SUBMISSION_TIME @@ -66,7 +66,7 @@ internal class TaskReadSupport(private val projection: List?) : ReadSupp "gpu_capacity" to TASK_GPU_CAPACITY, "parents" to TASK_PARENTS, "children" to TASK_CHILDREN, - "nature" to TASK_NATURE, + "deferrable" to TASK_DEFERRABLE, "deadline" to TASK_DEADLINE, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt index 1b22e2a7..b4946ed3 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt @@ -48,7 +48,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< private var localGpuCapacity = 0.0 private var localParents = mutableSetOf() private var localChildren = mutableSetOf() - private var localNature: String? = null + private var localDeferrable: Boolean = false private var localDeadline = -1L /** @@ -122,10 +122,10 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< } "parents" -> RelationConverter(localParents) "children" -> RelationConverter(localChildren) - "nature" -> + "deferrable" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localNature = value.toStringUsingUTF8() + override fun addBoolean(value: Boolean) { + localDeferrable = value } } "deadline" -> @@ -150,7 +150,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< localGpuCapacity = 0.0 localParents.clear() localChildren.clear() - localNature = null + localDeferrable = false localDeadline = -1 } @@ -172,7 +172,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< localGpuCapacity, localParents.toMutableSet(), localChildren.toSet(), - localNature, + localDeferrable, localDeadline, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt index f7f5e953..45f8e7c3 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt @@ -163,4 +163,73 @@ private val TASK_SCHEMA_V2: MessageType = ) .named("resource") -public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V2 +private val TASK_SCHEMA_V3: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("name"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submission_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional( + PrimitiveType.PrimitiveTypeName.INT32, + ) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BOOLEAN) + .named("deferrable"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), + ) + .named("resource") + +public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V3 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt index a7ce62b8..c245f804 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt @@ -24,7 +24,6 @@ package org.opendc.trace.formats.workload.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary import org.apache.parquet.io.api.RecordConsumer import kotlin.math.roundToLong @@ -79,11 +78,9 @@ internal class TaskWriteSupport : WriteSupport() { consumer.addLong(record.memCapacity.roundToLong()) consumer.endField("mem_capacity", 5) - record.nature?.let { - consumer.startField("nature", 6) - consumer.addBinary(Binary.fromCharSequence(it)) - consumer.endField("nature", 6) - } + consumer.startField("deferrable", 6) + consumer.addBoolean(record.deferrable) + consumer.endField("deferrable", 6) if (record.deadline != -1L) { consumer.startField("deadline", 7) -- cgit v1.2.3