From 1e35c61cd31b8bfb33a6ccbb46b08c0466518e6c Mon Sep 17 00:00:00 2001 From: Sacheendra Talluri Date: Thu, 20 Mar 2025 10:16:46 +0100 Subject: Adds load shifting over time (#319) * Start time shifting * Existing experiments work with new columns * Remove unused traces dir * Update java to 21 LTS and jacoco to be compatible * Minimal working timeshifting * Timeshift scheduler linked as carbon receiver * Add basic tests for timeshift scheduler * Run spotless apply * Modify tarce format tests to support new fields * Change all mentions of java 19 to 21 * Add a deferAll option to workload to make all tasks deferrable * Run spotless apply * Copy traces from resources in web dockerfile --- .../org/opendc/trace/conv/ResourceColumns.kt | 14 +++++++++++- .../formats/opendc/OdcVmResourceTableReader.kt | 20 ++++++++++++++--- .../formats/opendc/OdcVmResourceTableWriter.kt | 25 ++++++++++++++++++++- .../trace/formats/opendc/OdcVmTraceFormat.kt | 4 ++++ .../trace/formats/opendc/parquet/Resource.kt | 2 ++ .../formats/opendc/parquet/ResourceReadSupport.kt | 18 +++++++++++++++ .../opendc/parquet/ResourceRecordMaterializer.kt | 18 +++++++++++++++ .../formats/opendc/parquet/ResourceWriteSupport.kt | 19 ++++++++++++++++ .../test/resources/opendc/trace-v2.1/tasks.parquet | Bin 4597 -> 5407 bytes 9 files changed, 115 insertions(+), 5 deletions(-) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 9a826418..d0f56bff 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -43,7 +43,7 @@ public val resourceClusterID: String = "cluster_id" public val resourceSubmissionTime: String = "submission_time" /** - * Start time for the resource. + * Carbon intensity of the resource. */ @JvmField public val resourceCarbonIntensity: String = "carbon_intensity" @@ -71,3 +71,15 @@ public val resourceCpuCapacity: String = "cpu_capacity" */ @JvmField public val resourceMemCapacity: String = "mem_capacity" + +/** + * Nature of the task. Delayable, interruptible, etc. + */ +@JvmField +public val resourceNature: String = "nature" + +/** + * Deadline of the task. + */ +@JvmField +public val resourceDeadline: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt index 9c489bfd..10f60658 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -25,9 +25,11 @@ package org.opendc.trace.formats.opendc import org.opendc.trace.TableReader import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader @@ -62,6 +64,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..colMemCapacity) { "Invalid column index" } - return false + require(index in 0..colDeadline) { "Invalid column index" } + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colNature -> record.nature == null + colDeadline -> record.deadline == -1L + else -> false + } } override fun getBoolean(index: Int): Boolean { @@ -97,6 +109,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.durationTime + colDeadline -> record.deadline else -> throw IllegalArgumentException("Invalid column") } } @@ -115,11 +128,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.id + colNature -> record.nature else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt index 19409fa7..2b8db7f1 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -26,9 +26,11 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import java.time.Duration @@ -49,6 +51,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } @@ -103,6 +122,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter localDuration = value + colDeadline -> localDeadline = value else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -133,6 +153,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter localId = value + colNature -> localNature = value else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -197,4 +218,6 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt index ea404c65..00922d4f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt @@ -34,4 +34,6 @@ internal data class Resource( val cpuCount: Int, val cpuCapacity: Double, val memCapacity: Double, + val nature: String? = null, + val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt index e5a28a12..75a2bbb2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -33,9 +33,11 @@ import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime /** @@ -56,6 +58,8 @@ internal class ResourceReadSupport(private val projection: List?) : Read "cpu_capacity" to resourceCpuCapacity, "requiredMemory" to resourceMemCapacity, "mem_capacity" to resourceMemCapacity, + "nature" to resourceNature, + "deadline" to resourceDeadline, ) override fun init(context: InitContext): ReadContext { @@ -112,6 +116,13 @@ internal class ResourceReadSupport(private val projection: List?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("requiredMemory"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") @@ -142,6 +153,13 @@ internal class ResourceReadSupport(private val projection: List?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt index 5f02ea1e..866b304e 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -43,6 +43,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali private var localCpuCount = 0 private var localCpuCapacity = 0.0 private var localMemCapacity = 0.0 + private var localNature: String? = null + private var localDeadline = -1L /** * Root converter for the record. @@ -95,6 +97,18 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity = value.toDouble() } } + "nature" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localNature = value.toStringUsingUTF8() + } + } + "deadline" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDeadline = value + } + } else -> error("Unknown column $type") } } @@ -106,6 +120,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localCpuCount = 0 localCpuCapacity = 0.0 localMemCapacity = 0.0 + localNature = null + localDeadline = -1 } override fun end() {} @@ -121,6 +137,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localCpuCount, localCpuCapacity, localMemCapacity, + localNature, + localDeadline, ) override fun getRootConverter(): GroupConverter = root diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt index e5822b0c..c3e984fb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt @@ -83,6 +83,18 @@ internal class ResourceWriteSupport : WriteSupport() { consumer.addLong(record.memCapacity.roundToLong()) consumer.endField("mem_capacity", 5) + record.nature?.let { + consumer.startField("nature", 6) + consumer.addBinary(Binary.fromCharSequence(it)) + consumer.endField("nature", 6) + } + + if (record.deadline != -1L) { + consumer.startField("deadline", 7) + consumer.addLong(record.deadline) + consumer.endField("deadline", 7) + } + consumer.endMessage() } @@ -114,6 +126,13 @@ internal class ResourceWriteSupport : WriteSupport() { Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") } diff --git a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet index 5053a192..c73177e2 100644 Binary files a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet and b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet differ -- cgit v1.2.3