summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-api/src')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt20
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt18
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt18
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt19
-rw-r--r--opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquetbin4597 -> 5407 bytes
9 files changed, 115 insertions, 5 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index 9a826418..d0f56bff 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -43,7 +43,7 @@ public val resourceClusterID: String = "cluster_id"
public val resourceSubmissionTime: String = "submission_time"
/**
- * Start time for the resource.
+ * Carbon intensity of the resource.
*/
@JvmField
public val resourceCarbonIntensity: String = "carbon_intensity"
@@ -71,3 +71,15 @@ public val resourceCpuCapacity: String = "cpu_capacity"
*/
@JvmField
public val resourceMemCapacity: String = "mem_capacity"
+
+/**
+ * Nature of the task. Delayable, interruptible, etc.
+ */
+@JvmField
+public val resourceNature: String = "nature"
+
+/**
+ * Deadline of the task.
+ */
+@JvmField
+public val resourceDeadline: String = "deadline"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
index 9c489bfd..10f60658 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt
@@ -25,9 +25,11 @@ package org.opendc.trace.formats.opendc
import org.opendc.trace.TableReader
import org.opendc.trace.conv.resourceCpuCapacity
import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceDeadline
import org.opendc.trace.conv.resourceDuration
import org.opendc.trace.conv.resourceID
import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceNature
import org.opendc.trace.conv.resourceSubmissionTime
import org.opendc.trace.formats.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -62,6 +64,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
private val colCpuCount = 3
private val colCpuCapacity = 4
private val colMemCapacity = 5
+ private val colNature = 6
+ private val colDeadline = 7
override fun resolve(name: String): Int {
return when (name) {
@@ -71,13 +75,21 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
resourceCpuCount -> colCpuCount
resourceCpuCapacity -> colCpuCapacity
resourceMemCapacity -> colMemCapacity
+ resourceNature -> colNature
+ resourceDeadline -> colDeadline
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..colMemCapacity) { "Invalid column index" }
- return false
+ require(index in 0..colDeadline) { "Invalid column index" }
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ colNature -> record.nature == null
+ colDeadline -> record.deadline == -1L
+ else -> false
+ }
}
override fun getBoolean(index: Int): Boolean {
@@ -97,6 +109,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
colDurationTime -> record.durationTime
+ colDeadline -> record.deadline
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -115,11 +128,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- override fun getString(index: Int): String {
+ override fun getString(index: Int): String? {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
colID -> record.id
+ colNature -> record.nature
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
index 19409fa7..2b8db7f1 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt
@@ -26,9 +26,11 @@ import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
import org.opendc.trace.conv.resourceCpuCapacity
import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceDeadline
import org.opendc.trace.conv.resourceDuration
import org.opendc.trace.conv.resourceID
import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceNature
import org.opendc.trace.conv.resourceSubmissionTime
import org.opendc.trace.formats.opendc.parquet.Resource
import java.time.Duration
@@ -49,6 +51,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private var localCpuCount: Int = 0
private var localCpuCapacity: Double = Double.NaN
private var localMemCapacity: Double = Double.NaN
+ private var localNature: String? = null
+ private var localDeadline: Long = -1
override fun startRow() {
localIsActive = true
@@ -58,12 +62,25 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
localCpuCount = 0
localCpuCapacity = Double.NaN
localMemCapacity = Double.NaN
+ localNature = null
+ localDeadline = -1L
}
override fun endRow() {
check(localIsActive) { "No active row" }
localIsActive = false
- writer.write(Resource(localId, localSubmissionTime, localDuration, localCpuCount, localCpuCapacity, localMemCapacity))
+ writer.write(
+ Resource(
+ localId,
+ localSubmissionTime,
+ localDuration,
+ localCpuCount,
+ localCpuCapacity,
+ localMemCapacity,
+ localNature,
+ localDeadline,
+ ),
+ )
}
override fun resolve(name: String): Int {
@@ -74,6 +91,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
resourceCpuCount -> colCpuCount
resourceCpuCapacity -> colCpuCapacity
resourceMemCapacity -> colMemCapacity
+ resourceNature -> colNature
+ resourceDeadline -> colDeadline
else -> -1
}
}
@@ -103,6 +122,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
check(localIsActive) { "No active row" }
when (index) {
colDuration -> localDuration = value
+ colDeadline -> localDeadline = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -133,6 +153,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
check(localIsActive) { "No active row" }
when (index) {
colID -> localId = value
+ colNature -> localNature = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -197,4 +218,6 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private val colCpuCount = 3
private val colCpuCapacity = 4
private val colMemCapacity = 5
+ private val colNature = 6
+ private val colDeadline = 7
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
index b75cf091..e2013182 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt
@@ -39,9 +39,11 @@ import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
import org.opendc.trace.conv.resourceCpuCapacity
import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceDeadline
import org.opendc.trace.conv.resourceDuration
import org.opendc.trace.conv.resourceID
import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceNature
import org.opendc.trace.conv.resourceStateCpuUsage
import org.opendc.trace.conv.resourceStateDuration
import org.opendc.trace.conv.resourceStateTimestamp
@@ -100,6 +102,8 @@ public class OdcVmTraceFormat : TraceFormat {
TableColumn(resourceCpuCount, TableColumnType.Int),
TableColumn(resourceCpuCapacity, TableColumnType.Double),
TableColumn(resourceMemCapacity, TableColumnType.Double),
+ TableColumn(resourceNature, TableColumnType.String),
+ TableColumn(resourceDeadline, TableColumnType.Long),
),
)
TABLE_RESOURCE_STATES ->
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
index ea404c65..00922d4f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt
@@ -34,4 +34,6 @@ internal data class Resource(
val cpuCount: Int,
val cpuCapacity: Double,
val memCapacity: Double,
+ val nature: String? = null,
+ val deadline: Long = -1,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
index e5a28a12..75a2bbb2 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt
@@ -33,9 +33,11 @@ import org.apache.parquet.schema.Types
import org.opendc.trace.TableColumn
import org.opendc.trace.conv.resourceCpuCapacity
import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceDeadline
import org.opendc.trace.conv.resourceDuration
import org.opendc.trace.conv.resourceID
import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceNature
import org.opendc.trace.conv.resourceSubmissionTime
/**
@@ -56,6 +58,8 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
"cpu_capacity" to resourceCpuCapacity,
"requiredMemory" to resourceMemCapacity,
"mem_capacity" to resourceMemCapacity,
+ "nature" to resourceNature,
+ "deadline" to resourceDeadline,
)
override fun init(context: InitContext): ReadContext {
@@ -112,6 +116,13 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("requiredMemory"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("nature"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
)
.named("resource")
@@ -142,6 +153,13 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("mem_capacity"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("nature"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
)
.named("resource")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
index 5f02ea1e..866b304e 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt
@@ -43,6 +43,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
private var localCpuCount = 0
private var localCpuCapacity = 0.0
private var localMemCapacity = 0.0
+ private var localNature: String? = null
+ private var localDeadline = -1L
/**
* Root converter for the record.
@@ -95,6 +97,18 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
localMemCapacity = value.toDouble()
}
}
+ "nature" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localNature = value.toStringUsingUTF8()
+ }
+ }
+ "deadline" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localDeadline = value
+ }
+ }
else -> error("Unknown column $type")
}
}
@@ -106,6 +120,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
localCpuCount = 0
localCpuCapacity = 0.0
localMemCapacity = 0.0
+ localNature = null
+ localDeadline = -1
}
override fun end() {}
@@ -121,6 +137,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
localCpuCount,
localCpuCapacity,
localMemCapacity,
+ localNature,
+ localDeadline,
)
override fun getRootConverter(): GroupConverter = root
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
index e5822b0c..c3e984fb 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt
@@ -83,6 +83,18 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
consumer.addLong(record.memCapacity.roundToLong())
consumer.endField("mem_capacity", 5)
+ record.nature?.let {
+ consumer.startField("nature", 6)
+ consumer.addBinary(Binary.fromCharSequence(it))
+ consumer.endField("nature", 6)
+ }
+
+ if (record.deadline != -1L) {
+ consumer.startField("deadline", 7)
+ consumer.addLong(record.deadline)
+ consumer.endField("deadline", 7)
+ }
+
consumer.endMessage()
}
@@ -114,6 +126,13 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("mem_capacity"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("nature"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("deadline"),
)
.named("resource")
}
diff --git a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet
index 5053a192..c73177e2 100644
--- a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet
+++ b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet
Binary files differ