summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-10-02 16:19:43 +0200
committerGitHub <noreply@github.com>2025-10-02 16:19:43 +0200
commitcb95cb2a5b24ae62c33962c988e89daf9a1a3e91 (patch)
tree4f9d3c9ffea61783389fdb5c5485d08e505c0341 /opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc
parent48ddc082ea301f54717a8ab7c54023f73220e4eb (diff)
Changed the input of "nature" to a boolean to safe space and make loading faster. (#375)
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt16
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt12
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt71
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt9
9 files changed, 102 insertions, 32 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index 0df52c71..5f29af03 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -87,7 +87,7 @@ public const val TASK_CHILDREN: String = "children"
/**
* Nature of the task. Delayable, interruptible, etc.
*/
-public const val TASK_NATURE: String = "nature"
+public const val TASK_DEFERRABLE: String = "deferrable"
/**
* Deadline of the task.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
index 6c700b0c..51ab9242 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableReader.kt
@@ -28,13 +28,13 @@ import org.opendc.trace.conv.TASK_CHILDREN
import org.opendc.trace.conv.TASK_CPU_CAPACITY
import org.opendc.trace.conv.TASK_CPU_COUNT
import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DEFERRABLE
import org.opendc.trace.conv.TASK_DURATION
import org.opendc.trace.conv.TASK_GPU_CAPACITY
import org.opendc.trace.conv.TASK_GPU_COUNT
import org.opendc.trace.conv.TASK_ID
import org.opendc.trace.conv.TASK_MEM_CAPACITY
import org.opendc.trace.conv.TASK_NAME
-import org.opendc.trace.conv.TASK_NATURE
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
import org.opendc.trace.formats.workload.parquet.Task
@@ -76,7 +76,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
private val colGpuCount = 8
private val colParents = 9
private val colChildren = 10
- private val colNature = 11
+ private val colDeferrable = 11
private val colDeadline = 12
private val typeParents = TableColumnType.Set(TableColumnType.Int)
@@ -95,7 +95,7 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
TASK_GPU_CAPACITY -> colGpuCapacity
TASK_PARENTS -> colParents
TASK_CHILDREN -> colChildren
- TASK_NATURE -> colNature
+ TASK_DEFERRABLE -> colDeferrable
TASK_DEADLINE -> colDeadline
else -> -1
}
@@ -106,14 +106,17 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- colNature -> record.nature == null
colDeadline -> record.deadline == -1L
else -> false
}
}
override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ colDeferrable -> record.deferrable
+ else -> throw IllegalArgumentException("Invalid column")
+ }
}
override fun getInt(index: Int): Int {
@@ -156,7 +159,6 @@ internal class TaskTableReader(private val reader: LocalParquetReader<Task>) : T
return when (index) {
colName -> record.name
- colNature -> record.nature
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
index 39be36c1..5e57fd84 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/TaskTableWriter.kt
@@ -28,13 +28,13 @@ import org.opendc.trace.conv.TASK_CHILDREN
import org.opendc.trace.conv.TASK_CPU_CAPACITY
import org.opendc.trace.conv.TASK_CPU_COUNT
import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DEFERRABLE
import org.opendc.trace.conv.TASK_DURATION
import org.opendc.trace.conv.TASK_GPU_CAPACITY
import org.opendc.trace.conv.TASK_GPU_COUNT
import org.opendc.trace.conv.TASK_ID
import org.opendc.trace.conv.TASK_MEM_CAPACITY
import org.opendc.trace.conv.TASK_NAME
-import org.opendc.trace.conv.TASK_NATURE
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
import org.opendc.trace.formats.workload.parquet.Task
@@ -61,7 +61,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
private var localGpuCapacity: Double = Double.NaN
private var localParents = mutableSetOf<Int>()
private var localChildren = mutableSetOf<Int>()
- private var localNature: String? = null
+ private var localDeferrable: Boolean = false
private var localDeadline: Long = -1
override fun startRow() {
@@ -77,7 +77,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
localGpuCapacity = Double.NaN
localParents.clear()
localChildren.clear()
- localNature = null
+ localDeferrable = false
localDeadline = -1L
}
@@ -97,7 +97,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
localGpuCapacity,
localParents,
localChildren,
- localNature,
+ localDeferrable,
localDeadline,
),
)
@@ -116,7 +116,7 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
TASK_GPU_CAPACITY -> colGpuCapacity
TASK_PARENTS -> colParents
TASK_CHILDREN -> colChildren
- TASK_NATURE -> colNature
+ TASK_DEFERRABLE -> colNature
TASK_DEADLINE -> colDeadline
else -> -1
}
@@ -126,7 +126,10 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
index: Int,
value: Boolean,
) {
- throw IllegalArgumentException("Invalid column or type [index $index]")
+ when (index) {
+ colNature -> localDeferrable = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
}
override fun setInt(
@@ -181,7 +184,6 @@ internal class TaskTableWriter(private val writer: ParquetWriter<Task>) : TableW
check(localIsActive) { "No active row" }
when (index) {
colName -> localName = value
- colNature -> localNature = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt
index 7af0650e..0f0ba0b8 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/WorkloadTraceFormat.kt
@@ -37,12 +37,12 @@ import org.opendc.trace.conv.TASK_CHILDREN
import org.opendc.trace.conv.TASK_CPU_CAPACITY
import org.opendc.trace.conv.TASK_CPU_COUNT
import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DEFERRABLE
import org.opendc.trace.conv.TASK_DURATION
import org.opendc.trace.conv.TASK_GPU_CAPACITY
import org.opendc.trace.conv.TASK_GPU_COUNT
import org.opendc.trace.conv.TASK_ID
import org.opendc.trace.conv.TASK_MEM_CAPACITY
-import org.opendc.trace.conv.TASK_NATURE
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
import org.opendc.trace.formats.workload.parquet.FragmentReadSupport
@@ -97,7 +97,7 @@ public class WorkloadTraceFormat : TraceFormat {
TableColumn(TASK_GPU_CAPACITY, TableColumnType.Double),
TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_NATURE, TableColumnType.String),
+ TableColumn(TASK_DEFERRABLE, TableColumnType.Boolean),
TableColumn(TASK_DEADLINE, TableColumnType.Long),
),
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
index 0ebac5eb..ccc44bde 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/Task.kt
@@ -39,6 +39,6 @@ internal data class Task(
val gpuCapacity: Double = 0.0,
val parents: MutableSet<Int> = mutableSetOf(),
val children: Set<Int> = emptySet(),
- val nature: String? = null,
+ val deferrable: Boolean = false,
val deadline: Long = -1,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
index 4bbb18ac..5b743fbe 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskReadSupport.kt
@@ -33,13 +33,13 @@ import org.opendc.trace.conv.TASK_CHILDREN
import org.opendc.trace.conv.TASK_CPU_CAPACITY
import org.opendc.trace.conv.TASK_CPU_COUNT
import org.opendc.trace.conv.TASK_DEADLINE
+import org.opendc.trace.conv.TASK_DEFERRABLE
import org.opendc.trace.conv.TASK_DURATION
import org.opendc.trace.conv.TASK_GPU_CAPACITY
import org.opendc.trace.conv.TASK_GPU_COUNT
import org.opendc.trace.conv.TASK_ID
import org.opendc.trace.conv.TASK_MEM_CAPACITY
import org.opendc.trace.conv.TASK_NAME
-import org.opendc.trace.conv.TASK_NATURE
import org.opendc.trace.conv.TASK_PARENTS
import org.opendc.trace.conv.TASK_SUBMISSION_TIME
@@ -66,7 +66,7 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
"gpu_capacity" to TASK_GPU_CAPACITY,
"parents" to TASK_PARENTS,
"children" to TASK_CHILDREN,
- "nature" to TASK_NATURE,
+ "deferrable" to TASK_DEFERRABLE,
"deadline" to TASK_DEADLINE,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
index 1b22e2a7..b4946ed3 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskRecordMaterializer.kt
@@ -48,7 +48,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
private var localGpuCapacity = 0.0
private var localParents = mutableSetOf<Int>()
private var localChildren = mutableSetOf<Int>()
- private var localNature: String? = null
+ private var localDeferrable: Boolean = false
private var localDeadline = -1L
/**
@@ -122,10 +122,10 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
}
"parents" -> RelationConverter(localParents)
"children" -> RelationConverter(localChildren)
- "nature" ->
+ "deferrable" ->
object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- localNature = value.toStringUsingUTF8()
+ override fun addBoolean(value: Boolean) {
+ localDeferrable = value
}
}
"deadline" ->
@@ -150,7 +150,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
localGpuCapacity = 0.0
localParents.clear()
localChildren.clear()
- localNature = null
+ localDeferrable = false
localDeadline = -1
}
@@ -172,7 +172,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
localGpuCapacity,
localParents.toMutableSet(),
localChildren.toSet(),
- localNature,
+ localDeferrable,
localDeadline,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt
index f7f5e953..45f8e7c3 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskSchemas.kt
@@ -163,4 +163,73 @@ private val TASK_SCHEMA_V2: MessageType =
)
.named("resource")
-public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V2
+private val TASK_SCHEMA_V3: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("name"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submission_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("gpu_count"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("gpu_capacity"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(
+ PrimitiveType.PrimitiveTypeName.INT32,
+ )
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(
+ Types.optional(
+ PrimitiveType.PrimitiveTypeName.INT32,
+ )
+ .named("element"),
+ )
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BOOLEAN)
+ .named("deferrable"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
+ )
+ .named("resource")
+
+public val TASK_SCHEMA: MessageType = TASK_SCHEMA_V3
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
index a7ce62b8..c245f804 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/workload/parquet/TaskWriteSupport.kt
@@ -24,7 +24,6 @@ package org.opendc.trace.formats.workload.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.RecordConsumer
import kotlin.math.roundToLong
@@ -79,11 +78,9 @@ internal class TaskWriteSupport : WriteSupport<Task>() {
consumer.addLong(record.memCapacity.roundToLong())
consumer.endField("mem_capacity", 5)
- record.nature?.let {
- consumer.startField("nature", 6)
- consumer.addBinary(Binary.fromCharSequence(it))
- consumer.endField("nature", 6)
- }
+ consumer.startField("deferrable", 6)
+ consumer.addBoolean(record.deferrable)
+ consumer.endField("deferrable", 6)
if (record.deadline != -1L) {
consumer.startField("deadline", 7)