diff options
| author | Sacheendra Talluri <sacheendra.t@gmail.com> | 2025-03-20 10:16:46 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-20 10:16:46 +0100 |
| commit | 1e35c61cd31b8bfb33a6ccbb46b08c0466518e6c (patch) | |
| tree | 261c84148cd045246bdc2ad7aa3c41524356b699 /opendc-trace/opendc-trace-api/src | |
| parent | 6211b887b68b3ebc9245fada1c0f36725955b052 (diff) | |
Adds load shifting over time (#319)
* Start time shifting
* Existing experiments work with new columns
* Remove unused traces dir
* Update java to 21 LTS and jacoco to be compatible
* Minimal working timeshifting
* Timeshift scheduler linked as carbon receiver
* Add basic tests for timeshift scheduler
* Run spotless apply
* Modify tarce format tests to support new fields
* Change all mentions of java 19 to 21
* Add a deferAll option to workload to make all tasks deferrable
* Run spotless apply
* Copy traces from resources in web dockerfile
Diffstat (limited to 'opendc-trace/opendc-trace-api/src')
9 files changed, 115 insertions, 5 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 9a826418..d0f56bff 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -43,7 +43,7 @@ public val resourceClusterID: String = "cluster_id" public val resourceSubmissionTime: String = "submission_time" /** - * Start time for the resource. + * Carbon intensity of the resource. */ @JvmField public val resourceCarbonIntensity: String = "carbon_intensity" @@ -71,3 +71,15 @@ public val resourceCpuCapacity: String = "cpu_capacity" */ @JvmField public val resourceMemCapacity: String = "mem_capacity" + +/** + * Nature of the task. Delayable, interruptible, etc. + */ +@JvmField +public val resourceNature: String = "nature" + +/** + * Deadline of the task. + */ +@JvmField +public val resourceDeadline: String = "deadline" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt index 9c489bfd..10f60658 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -25,9 +25,11 @@ package org.opendc.trace.formats.opendc import org.opendc.trace.TableReader import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader @@ -62,6 +64,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 + private val colNature = 6 + private val colDeadline = 7 override fun resolve(name: String): Int { return when (name) { @@ -71,13 +75,21 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..colMemCapacity) { "Invalid column index" } - return false + require(index in 0..colDeadline) { "Invalid column index" } + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colNature -> record.nature == null + colDeadline -> record.deadline == -1L + else -> false + } } override fun getBoolean(index: Int): Boolean { @@ -97,6 +109,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { colDurationTime -> record.durationTime + colDeadline -> record.deadline else -> throw IllegalArgumentException("Invalid column") } } @@ -115,11 +128,12 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } - override fun getString(index: Int): String { + override fun getString(index: Int): String? { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { colID -> record.id + colNature -> record.nature else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt index 19409fa7..2b8db7f1 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -26,9 +26,11 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import java.time.Duration @@ -49,6 +51,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private var localCpuCount: Int = 0 private var localCpuCapacity: Double = Double.NaN private var localMemCapacity: Double = Double.NaN + private var localNature: String? = null + private var localDeadline: Long = -1 override fun startRow() { localIsActive = true @@ -58,12 +62,25 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour localCpuCount = 0 localCpuCapacity = Double.NaN localMemCapacity = Double.NaN + localNature = null + localDeadline = -1L } override fun endRow() { check(localIsActive) { "No active row" } localIsActive = false - writer.write(Resource(localId, localSubmissionTime, localDuration, localCpuCount, localCpuCapacity, localMemCapacity)) + writer.write( + Resource( + localId, + localSubmissionTime, + localDuration, + localCpuCount, + localCpuCapacity, + localMemCapacity, + localNature, + localDeadline, + ), + ) } override fun resolve(name: String): Int { @@ -74,6 +91,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity + resourceNature -> colNature + resourceDeadline -> colDeadline else -> -1 } } @@ -103,6 +122,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour check(localIsActive) { "No active row" } when (index) { colDuration -> localDuration = value + colDeadline -> localDeadline = value else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -133,6 +153,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour check(localIsActive) { "No active row" } when (index) { colID -> localId = value + colNature -> localNature = value else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -197,4 +218,6 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 + private val colNature = 6 + private val colDeadline = 7 } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt index b75cf091..e2013182 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt @@ -39,9 +39,11 @@ import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceStateCpuUsage import org.opendc.trace.conv.resourceStateDuration import org.opendc.trace.conv.resourceStateTimestamp @@ -100,6 +102,8 @@ public class OdcVmTraceFormat : TraceFormat { TableColumn(resourceCpuCount, TableColumnType.Int), TableColumn(resourceCpuCapacity, TableColumnType.Double), TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceNature, TableColumnType.String), + TableColumn(resourceDeadline, TableColumnType.Long), ), ) TABLE_RESOURCE_STATES -> diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt index ea404c65..00922d4f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt @@ -34,4 +34,6 @@ internal data class Resource( val cpuCount: Int, val cpuCapacity: Double, val memCapacity: Double, + val nature: String? = null, + val deadline: Long = -1, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt index e5a28a12..75a2bbb2 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -33,9 +33,11 @@ import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDeadline import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceNature import org.opendc.trace.conv.resourceSubmissionTime /** @@ -56,6 +58,8 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read "cpu_capacity" to resourceCpuCapacity, "requiredMemory" to resourceMemCapacity, "mem_capacity" to resourceMemCapacity, + "nature" to resourceNature, + "deadline" to resourceDeadline, ) override fun init(context: InitContext): ReadContext { @@ -112,6 +116,13 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("requiredMemory"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") @@ -142,6 +153,13 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt index 5f02ea1e..866b304e 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -43,6 +43,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali private var localCpuCount = 0 private var localCpuCapacity = 0.0 private var localMemCapacity = 0.0 + private var localNature: String? = null + private var localDeadline = -1L /** * Root converter for the record. @@ -95,6 +97,18 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localMemCapacity = value.toDouble() } } + "nature" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localNature = value.toStringUsingUTF8() + } + } + "deadline" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDeadline = value + } + } else -> error("Unknown column $type") } } @@ -106,6 +120,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localCpuCount = 0 localCpuCapacity = 0.0 localMemCapacity = 0.0 + localNature = null + localDeadline = -1 } override fun end() {} @@ -121,6 +137,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localCpuCount, localCpuCapacity, localMemCapacity, + localNature, + localDeadline, ) override fun getRootConverter(): GroupConverter = root diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt index e5822b0c..c3e984fb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt @@ -83,6 +83,18 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { consumer.addLong(record.memCapacity.roundToLong()) consumer.endField("mem_capacity", 5) + record.nature?.let { + consumer.startField("nature", 6) + consumer.addBinary(Binary.fromCharSequence(it)) + consumer.endField("nature", 6) + } + + if (record.deadline != -1L) { + consumer.startField("deadline", 7) + consumer.addLong(record.deadline) + consumer.endField("deadline", 7) + } + consumer.endMessage() } @@ -114,6 +126,13 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("mem_capacity"), + Types + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("nature"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("deadline"), ) .named("resource") } diff --git a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet Binary files differindex 5053a192..c73177e2 100644 --- a/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet +++ b/opendc-trace/opendc-trace-api/src/test/resources/opendc/trace-v2.1/tasks.parquet |
