diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-09-05 15:17:58 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-05 15:17:58 +0200 |
| commit | 3f05c61faeb94a2f1c920d87a6ca8bde34d551e0 (patch) | |
| tree | e699b6293c1ef275213999743ae1b64a15b0d8f6 /opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc | |
| parent | fa6e850b11bf09947c4afc03f4af18f488436bbd (diff) | |
Sim trace update (#249)
* Started on reimplementing the SimTrace implementation
* updated trace format. Fragments now do not have a deadline, but a duration. The Fragments are executed in order.
Diffstat (limited to 'opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc')
10 files changed, 71 insertions, 69 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index baaa0690..9a826418 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -40,7 +40,7 @@ public val resourceClusterID: String = "cluster_id" * Start time for the resource. */ @JvmField -public val resourceStartTime: String = "start_time" +public val resourceSubmissionTime: String = "submission_time" /** * Start time for the resource. @@ -52,7 +52,7 @@ public val resourceCarbonIntensity: String = "carbon_intensity" * End time for the resource. */ @JvmField -public val resourceStopTime: String = "stop_time" +public val resourceDuration: String = "duration" /** * Number of CPUs for the resource. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt index d86a0466..55f26fa6 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt @@ -27,10 +27,10 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.TableReader import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime import java.time.Duration import java.time.Instant import java.util.UUID @@ -87,8 +87,8 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun resolve(name: String): Int { return when (name) { resourceID -> colID - resourceStartTime -> colStartTime - resourceStopTime -> colStopTime + resourceSubmissionTime -> colStartTime + resourceDuration -> colStopTime resourceCpuCount -> colCpuCount resourceMemCapacity -> colMemCapacity else -> -1 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt index a75da9d9..7ce1c11a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt @@ -31,12 +31,12 @@ import org.opendc.trace.TableWriter import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime import org.opendc.trace.conv.resourceStateCpuUsagePct import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader @@ -79,8 +79,8 @@ public class AzureTraceFormat : TraceFormat { TableDetails( listOf( TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStartTime, TableColumnType.Instant), - TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceSubmissionTime, TableColumnType.Instant), + TableColumn(resourceDuration, TableColumnType.Instant), TableColumn(resourceCpuCount, TableColumnType.Int), TableColumn(resourceMemCapacity, TableColumnType.Double), ), diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt index 34197d7f..9c489bfd 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -25,10 +25,10 @@ package org.opendc.trace.formats.opendc import org.opendc.trace.TableReader import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -57,8 +57,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 + private val colSubmissionTime = 1 + private val colDurationTime = 2 private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 @@ -66,8 +66,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R override fun resolve(name: String): Int { return when (name) { resourceID -> colID - resourceStartTime -> colStartTime - resourceStopTime -> colStopTime + resourceSubmissionTime -> colSubmissionTime + resourceDuration -> colDurationTime resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity @@ -94,7 +94,11 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colDurationTime -> record.durationTime + else -> throw IllegalArgumentException("Invalid column") + } } override fun getFloat(index: Int): Float { @@ -128,8 +132,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - colStartTime -> record.startTime - colStopTime -> record.stopTime + colSubmissionTime -> record.submissionTime else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt index e0a11368..19409fa7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -26,10 +26,10 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.Resource import java.time.Duration import java.time.Instant @@ -44,8 +44,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour */ private var localIsActive = false private var localId: String = "" - private var localStartTime: Instant = Instant.MIN - private var localStopTime: Instant = Instant.MIN + private var localSubmissionTime: Instant = Instant.MIN + private var localDuration: Long = 0L private var localCpuCount: Int = 0 private var localCpuCapacity: Double = Double.NaN private var localMemCapacity: Double = Double.NaN @@ -53,8 +53,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour override fun startRow() { localIsActive = true localId = "" - localStartTime = Instant.MIN - localStopTime = Instant.MIN + localSubmissionTime = Instant.MIN + localDuration = 0L localCpuCount = 0 localCpuCapacity = Double.NaN localMemCapacity = Double.NaN @@ -63,14 +63,14 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour override fun endRow() { check(localIsActive) { "No active row" } localIsActive = false - writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity)) + writer.write(Resource(localId, localSubmissionTime, localDuration, localCpuCount, localCpuCapacity, localMemCapacity)) } override fun resolve(name: String): Int { return when (name) { resourceID -> colID - resourceStartTime -> colStartTime - resourceStopTime -> colStopTime + resourceSubmissionTime -> colSubmissionTime + resourceDuration -> colDuration resourceCpuCount -> colCpuCount resourceCpuCapacity -> colCpuCapacity resourceMemCapacity -> colMemCapacity @@ -100,7 +100,11 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour index: Int, value: Long, ) { - throw IllegalArgumentException("Invalid column or type [index $index]") + check(localIsActive) { "No active row" } + when (index) { + colDuration -> localDuration = value + else -> throw IllegalArgumentException("Invalid column index $index") + } } override fun setFloat( @@ -146,8 +150,7 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour ) { check(localIsActive) { "No active row" } when (index) { - colStartTime -> localStartTime = value - colStopTime -> localStopTime = value + colSubmissionTime -> localSubmissionTime = value else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -189,8 +192,8 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour } private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 + private val colSubmissionTime = 1 + private val colDuration = 2 private val colCpuCount = 3 private val colCpuCapacity = 4 private val colMemCapacity = 5 diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt index 2e2617ef..b75cf091 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt @@ -39,13 +39,13 @@ import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime import org.opendc.trace.conv.resourceStateCpuUsage import org.opendc.trace.conv.resourceStateDuration import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport @@ -95,8 +95,8 @@ public class OdcVmTraceFormat : TraceFormat { TableDetails( listOf( TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStartTime, TableColumnType.Instant), - TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceSubmissionTime, TableColumnType.Instant), + TableColumn(resourceDuration, TableColumnType.Long), TableColumn(resourceCpuCount, TableColumnType.Int), TableColumn(resourceCpuCapacity, TableColumnType.Double), TableColumn(resourceMemCapacity, TableColumnType.Double), diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt index e8efe60f..ea404c65 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt @@ -29,8 +29,8 @@ import java.time.Instant */ internal data class Resource( val id: String, - val startTime: Instant, - val stopTime: Instant, + val submissionTime: Instant, + val durationTime: Long, val cpuCount: Int, val cpuCapacity: Double, val memCapacity: Double, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt index 75238344..e5a28a12 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -33,10 +33,10 @@ import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn import org.opendc.trace.conv.resourceCpuCapacity import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceDuration import org.opendc.trace.conv.resourceID import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.conv.resourceSubmissionTime /** * A [ReadSupport] instance for [Resource] objects. @@ -48,10 +48,9 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read private val fieldMap = mapOf( "id" to resourceID, - "submissionTime" to resourceStartTime, - "start_time" to resourceStartTime, - "endTime" to resourceStopTime, - "stop_time" to resourceStopTime, + "submissionTime" to resourceSubmissionTime, + "submission_time" to resourceSubmissionTime, + "duration" to resourceDuration, "maxCores" to resourceCpuCount, "cpu_count" to resourceCpuCount, "cpu_capacity" to resourceCpuCapacity, @@ -106,8 +105,7 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read .named("submissionTime"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("endTime"), + .named("duration"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) .named("maxCores"), @@ -131,11 +129,10 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read Types .required(PrimitiveType.PrimitiveTypeName.INT64) .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), + .named("submission_time"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), + .named("duration"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) .named("cpu_count"), diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt index 2e32c2e2..5f02ea1e 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -38,8 +38,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali * State of current record being read. */ private var localId = "" - private var localStartTime = Instant.MIN - private var localStopTime = Instant.MIN + private var localSubmissionTime = Instant.MIN + private var localDuration = 0L private var localCpuCount = 0 private var localCpuCapacity = 0.0 private var localMemCapacity = 0.0 @@ -61,16 +61,16 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali localId = value.toStringUsingUTF8() } } - "start_time", "submissionTime" -> + "submission_time", "submissionTime" -> object : PrimitiveConverter() { override fun addLong(value: Long) { - localStartTime = Instant.ofEpochMilli(value) + localSubmissionTime = Instant.ofEpochMilli(value) } } - "stop_time", "endTime" -> + "duration" -> object : PrimitiveConverter() { override fun addLong(value: Long) { - localStopTime = Instant.ofEpochMilli(value) + localDuration = value } } "cpu_count", "maxCores" -> @@ -101,8 +101,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali override fun start() { localId = "" - localStartTime = Instant.MIN - localStopTime = Instant.MIN + localSubmissionTime = Instant.MIN + localDuration = 0L localCpuCount = 0 localCpuCapacity = 0.0 localMemCapacity = 0.0 @@ -116,8 +116,8 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali override fun getCurrentRecord(): Resource = Resource( localId, - localStartTime, - localStopTime, + localSubmissionTime, + localDuration, localCpuCount, localCpuCapacity, localMemCapacity, diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt index a9937ffd..e5822b0c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt @@ -63,13 +63,13 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { consumer.addBinary(Binary.fromCharSequence(record.id)) consumer.endField("id", 0) - consumer.startField("start_time", 1) - consumer.addLong(record.startTime.toEpochMilli()) - consumer.endField("start_time", 1) + consumer.startField("submission_time", 1) + consumer.addLong(record.submissionTime.toEpochMilli()) + consumer.endField("submission_time", 1) - consumer.startField("stop_time", 2) - consumer.addLong(record.stopTime.toEpochMilli()) - consumer.endField("stop_time", 2) + consumer.startField("duration", 2) + consumer.addLong(record.durationTime) + consumer.endField("duration", 2) consumer.startField("cpu_count", 3) consumer.addInteger(record.cpuCount) @@ -101,11 +101,10 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { Types .required(PrimitiveType.PrimitiveTypeName.INT64) .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), + .named("submission_time"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), + .named("duration"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) .named("cpu_count"), |
