From 0c0cf25616771cd40a9e401edcba4a5e5016f76e Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Wed, 16 Jul 2025 16:56:28 +0200 Subject: Added Workflows (#359) * Implemented Workflows for OpenDC --- .../org/opendc/trace/conv/ResourceColumns.kt | 12 ++++++ .../formats/opendc/OdcVmResourceTableReader.kt | 30 ++++++++++---- .../formats/opendc/OdcVmResourceTableWriter.kt | 28 ++++++++++--- .../trace/formats/opendc/OdcVmTraceFormat.kt | 8 ++++ .../trace/formats/opendc/parquet/Resource.kt | 6 ++- .../formats/opendc/parquet/ResourceReadSupport.kt | 46 ++++++++++++++++++++-- .../opendc/parquet/ResourceRecordMaterializer.kt | 44 +++++++++++++++++++++ .../parquet/ResourceStateRecordMaterializer.kt | 6 --- 8 files changed, 156 insertions(+), 24 deletions(-) (limited to 'opendc-trace/opendc-trace-api/src/main') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 181ca8e8..3d0341b2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -90,6 +90,18 @@ public val resourceGpuCapacity: String = "gpu_capacity" @JvmField public val resourceGpuMemCapacity: String = "gpu_mem_capacity" +/** + * The parents of the resource that need to be completed before this resource can be used. + */ +@JvmField +public val resourceParents: String = "parents" + +/** + * The children of the resource that cannot be started before this is completed. + */ +@JvmField +public val resourceChildren: String = "children" + /** * Nature of the task. Delayable, interruptible, etc. */ diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt index 97c2847e..495a5d75 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -22,7 +22,9 @@ package org.opendc.trace.formats.opendc +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline @@ -32,8 +34,10 @@ import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -66,10 +70,15 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity - resourceNature -> colNature - resourceDeadline -> colDeadline resourceGpuCount -> colGpuCount resourceGpuCapacity -> colGpuCapacity + resourceParents -> colParents + resourceChildren -> colChildren + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } @@ -174,7 +185,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader, ): Set? { - throw IllegalArgumentException("Invalid column") + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } } override fun getMap( diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt index 310d3dfc..022e288a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -24,13 +24,17 @@ package org.opendc.trace.formats.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration +import org.opendc.trace.conv.resourceGpuCapacity +import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import java.time.Duration @@ -51,10 +55,12 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter() + private var localChildren = mutableSetOf() + private var localNature: String? = null + private var localDeadline: Long = -1 override fun startRow() { localIsActive = true @@ -66,6 +72,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceGpuCount -> colGpuCount + resourceGpuCapacity -> colGpuCapacity + resourceParents -> colParents + resourceChildren -> colChildren resourceNature -> colNature resourceDeadline -> colDeadline else -> -1 @@ -226,8 +240,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter = emptySet(), + val children: Set = emptySet(), val nature: String? = null, val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt index 75a2bbb2..cd2ccef7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -29,15 +29,20 @@ import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration +import org.opendc.trace.conv.resourceGpuCapacity +import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime /** @@ -58,6 +63,10 @@ internal class ResourceReadSupport(private val projection: List?) : Read "cpu_capacity" to resourceCpuCapacity, "requiredMemory" to resourceMemCapacity, "mem_capacity" to resourceMemCapacity, + "gpu_count" to resourceGpuCount, + "gpu_capacity" to resourceGpuCapacity, + "parents" to resourceParents, + "children" to resourceChildren, "nature" to resourceNature, "deadline" to resourceDeadline, ) @@ -130,7 +139,7 @@ internal class ResourceReadSupport(private val projection: List?) : Read * Parquet read schema (version 2.1) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = + val READ_SCHEMA_V2_2: MessageType = Types.buildMessage() .addFields( Types @@ -153,6 +162,38 @@ internal class ResourceReadSupport(private val projection: List?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), Types .optional(PrimitiveType.PrimitiveTypeName.BINARY) .`as`(LogicalTypeAnnotation.stringType()) @@ -168,7 +209,6 @@ internal class ResourceReadSupport(private val projection: List?) : Read */ @JvmStatic val READ_SCHEMA: MessageType = - READ_SCHEMA_V2_0 - .union(READ_SCHEMA_V2_1) + READ_SCHEMA_V2_2 } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt index fe92ad65..f9493721 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -45,6 +45,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali private var localMemCapacity = 0.0 private var localGpuCount = 0 private var localGpuCapacity = 0.0 + private var localParents = mutableSetOf() + private var localChildren = mutableSetOf() private var localNature: String? = null private var localDeadline = -1L @@ -111,6 +113,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localGpuCapacity = value } } + "parents" -> RelationConverter(localParents) + "children" -> RelationConverter(localChildren) "nature" -> object : PrimitiveConverter() { override fun addBinary(value: Binary) { @@ -136,6 +140,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity = 0.0 localGpuCount = 0 localGpuCapacity = 0.0 + localParents.clear() + localChildren.clear() localNature = null localDeadline = -1 } @@ -155,9 +161,47 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity, localGpuCount, localGpuCapacity, + localParents.toSet(), + localChildren.toSet(), localNature, localDeadline, ) override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet) : GroupConverter() { + private val entryConverter = + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + val str = value.toStringUsingUTF8() + relations.add(str) + } + } + + private val listGroupConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "element" + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "list" + require(fieldIndex == 0) + return listGroupConverter + } + + override fun start() {} + + override fun end() {} + } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt index a53dcdb2..ee5e56aa 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -99,12 +99,6 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate localGpuUsage = value } } - "flops" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - // Ignore to support v1 format - } - } else -> error("Unknown column $type") } } -- cgit v1.2.3