diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-07-16 16:56:28 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-16 16:56:28 +0200 |
| commit | 0c0cf25616771cd40a9e401edcba4a5e5016f76e (patch) | |
| tree | 90fa673939a6c4c53900a6aa6eef073ad2957e34 /opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace | |
| parent | 089c449762950b4322c04f73ef7fe0e10af615df (diff) | |
Added Workflows (#359)
* Implemented Workflows for OpenDC
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace')
8 files changed, 156 insertions, 24 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 181ca8e8..3d0341b2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -91,6 +91,18 @@ public val resourceGpuCapacity: String = "gpu_capacity" public val resourceGpuMemCapacity: String = "gpu_mem_capacity" /** + * The parents of the resource that need to be completed before this resource can be used. + */ +@JvmField +public val resourceParents: String = "parents" + +/** + * The children of the resource that cannot be started before this is completed. + */ +@JvmField +public val resourceChildren: String = "children" + +/** * Nature of the task. Delayable, interruptible, etc. */ @JvmField diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt index 97c2847e..495a5d75 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -22,7 +22,9 @@ package org.opendc.trace.formats.opendc +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline @@ -32,8 +34,10 @@ import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration import java.time.Instant @@ -66,10 +70,15 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 - private val colNature = 6 - private val colDeadline = 7 - private val colGpuCapacity = 8 - private val colGpuCount = 9 + private val colGpuCapacity = 6 + private val colGpuCount = 7 + private val colParents = 8 + private val colChildren = 9 + private val colNature = 10 + private val colDeadline = 11 + + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) override fun resolve(name: String): Int { return when (name) { @@ -79,10 +88,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity - resourceNature -> colNature - resourceDeadline -> colDeadline resourceGpuCount -> colGpuCount resourceGpuCapacity -> colGpuCapacity + resourceParents -> colParents + resourceChildren -> colChildren + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } @@ -174,7 +185,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R index: Int, elementType: Class<T>, ): Set<T>? { - throw IllegalArgumentException("Invalid column") + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } } override fun <K, V> getMap( diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt index 310d3dfc..022e288a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -24,13 +24,17 @@ package org.opendc.trace.formats.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration +import org.opendc.trace.conv.resourceGpuCapacity +import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import java.time.Duration @@ -51,10 +55,12 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private var localCpuCount: Int = 0 private var localCpuCapacity: Double = Double.NaN private var localMemCapacity: Double = Double.NaN - private var localNature: String? = null - private var localDeadline: Long = -1 private var localGpuCount: Int = 0 private var localGpuCapacity: Double = Double.NaN + private var localParents = mutableSetOf<String>() + private var localChildren = mutableSetOf<String>() + private var localNature: String? = null + private var localDeadline: Long = -1 override fun startRow() { localIsActive = true @@ -66,6 +72,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour localMemCapacity = Double.NaN localGpuCount = 0 localGpuCapacity = Double.NaN + localParents.clear() + localChildren.clear() localNature = null localDeadline = -1L } @@ -83,6 +91,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour localMemCapacity, localGpuCount, localGpuCapacity, + localParents, + localChildren, localNature, localDeadline, ), @@ -97,6 +107,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceGpuCount -> colGpuCount + resourceGpuCapacity -> colGpuCapacity + resourceParents -> colParents + resourceChildren -> colChildren resourceNature -> colNature resourceDeadline -> colDeadline else -> -1 @@ -226,8 +240,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 - private val colNature = 6 - private val colDeadline = 7 - private val colGpuCount = 8 - private val colGpuCapacity = 9 + private val colGpuCount = 6 + private val colGpuCapacity = 7 + private val colParents = 8 + private val colChildren = 9 + private val colNature = 10 + private val colDeadline = 11 } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt index e2013182..74e880be 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt @@ -37,13 +37,17 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration +import org.opendc.trace.conv.resourceGpuCapacity +import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceStateCpuUsage import org.opendc.trace.conv.resourceStateDuration import org.opendc.trace.conv.resourceStateTimestamp @@ -102,6 +106,10 @@ public class OdcVmTraceFormat : TraceFormat { TableColumn(resourceCpuCount, TableColumnType.Int), TableColumn(resourceCpuCapacity, TableColumnType.Double), TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceGpuCount, TableColumnType.Int), + TableColumn(resourceGpuCapacity, TableColumnType.Double), + TableColumn(resourceParents, TableColumnType.Set(TableColumnType.String)), + TableColumn(resourceChildren, TableColumnType.Set(TableColumnType.String)), TableColumn(resourceNature, TableColumnType.String), TableColumn(resourceDeadline, TableColumnType.Long), ), diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt index 6747e9ce..d727920a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt @@ -34,8 +34,10 @@ internal data class Resource( val cpuCount: Int, val cpuCapacity: Double, val memCapacity: Double, - val gpuCount: Int, - val gpuCapacity: Double, + val gpuCount: Int = 0, + val gpuCapacity: Double = 0.0, + val parents: Set<String> = emptySet(), + val children: Set<String> = emptySet(), val nature: String? = null, val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt index 75a2bbb2..cd2ccef7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -29,15 +29,20 @@ import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn +import org.opendc.trace.conv.resourceChildren import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration +import org.opendc.trace.conv.resourceGpuCapacity +import org.opendc.trace.conv.resourceGpuCount import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity import org.opendc.trace.conv.resourceNature +import org.opendc.trace.conv.resourceParents import org.opendc.trace.conv.resourceSubmissionTime /** @@ -58,6 +63,10 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read "cpu_capacity" to resourceCpuCapacity, "requiredMemory" to resourceMemCapacity, "mem_capacity" to resourceMemCapacity, + "gpu_count" to resourceGpuCount, + "gpu_capacity" to resourceGpuCapacity, + "parents" to resourceParents, + "children" to resourceChildren, "nature" to resourceNature, "deadline" to resourceDeadline, ) @@ -130,7 +139,7 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read * Parquet read schema (version 2.1) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = + val READ_SCHEMA_V2_2: MessageType = Types.buildMessage() .addFields( Types @@ -154,6 +163,38 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("gpu_count"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("gpu_capacity"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField( + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("element"), + ) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types .optional(PrimitiveType.PrimitiveTypeName.BINARY) .`as`(LogicalTypeAnnotation.stringType()) .named("nature"), @@ -168,7 +209,6 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read */ @JvmStatic val READ_SCHEMA: MessageType = - READ_SCHEMA_V2_0 - .union(READ_SCHEMA_V2_1) + READ_SCHEMA_V2_2 } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt index fe92ad65..f9493721 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -45,6 +45,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali private var localMemCapacity = 0.0 private var localGpuCount = 0 private var localGpuCapacity = 0.0 + private var localParents = mutableSetOf<String>() + private var localChildren = mutableSetOf<String>() private var localNature: String? = null private var localDeadline = -1L @@ -111,6 +113,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localGpuCapacity = value } } + "parents" -> RelationConverter(localParents) + "children" -> RelationConverter(localChildren) "nature" -> object : PrimitiveConverter() { override fun addBinary(value: Binary) { @@ -136,6 +140,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity = 0.0 localGpuCount = 0 localGpuCapacity = 0.0 + localParents.clear() + localChildren.clear() localNature = null localDeadline = -1 } @@ -155,9 +161,47 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity, localGpuCount, localGpuCapacity, + localParents.toSet(), + localChildren.toSet(), localNature, localDeadline, ) override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { + private val entryConverter = + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + val str = value.toStringUsingUTF8() + relations.add(str) + } + } + + private val listGroupConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "element" + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + // fieldIndex = 0 corresponds to "list" + require(fieldIndex == 0) + return listGroupConverter + } + + override fun start() {} + + override fun end() {} + } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt index a53dcdb2..ee5e56aa 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -99,12 +99,6 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate localGpuUsage = value } } - "flops" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - // Ignore to support v1 format - } - } else -> error("Unknown column $type") } } |
