From ff94e63d5f80df317505c7ae0c6a5465f9a0c1f5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 13 Oct 2021 11:57:16 +0200 Subject: feat(trace): Add column for CPU capacity in OpenDC format This change adds a new column to resource table of the OpenDC trace format for the CPU capacity provisioned for a virtual machine, so that this capacity can be assigned to the virtual machine during simulation. --- .../org/opendc/trace/opendc/OdcVmResourceTableReader.kt | 8 +++++++- .../org/opendc/trace/opendc/OdcVmResourceTableWriter.kt | 1 + .../kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt | 4 +++- .../kotlin/org/opendc/trace/tools/TraceConverter.kt | 17 ++++++++++------- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index d93929aa..ffbdc440 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -69,6 +69,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) COL_CPU_COUNT -> getInt(index) + COL_CPU_CAPACITY -> getDouble(index) COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } @@ -95,6 +96,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0 COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } @@ -115,6 +117,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos() RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() else -> -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 9b32f8fd..886f3d54 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -48,7 +48,7 @@ public class OdcVmTraceFormat : TraceFormat { override fun create(path: Path) { // Construct directory containing the trace files - Files.createDirectory(path) + Files.createDirectories(path) val tables = getTables(path) @@ -68,6 +68,7 @@ public class OdcVmTraceFormat : TraceFormat { RESOURCE_START_TIME, RESOURCE_STOP_TIME, RESOURCE_CPU_COUNT, + RESOURCE_CPU_CAPACITY, RESOURCE_MEM_CAPACITY, ) ) @@ -138,6 +139,7 @@ public class OdcVmTraceFormat : TraceFormat { .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() .requiredInt("cpu_count") + .requiredDouble("cpu_capacity") .requiredLong("mem_capacity") .endRecord() diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 6fad43be..75472dff 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -130,9 +130,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { while (hasNextRow) { var id: String - var numCpus = Int.MIN_VALUE - var memCapacity = Double.MIN_VALUE - var memUsage = Double.MIN_VALUE + var cpuCount = 0 + var cpuCapacity = 0.0 + var memCapacity = 0.0 + var memUsage = 0.0 var startTime = Long.MAX_VALUE var stopTime = Long.MIN_VALUE @@ -140,12 +141,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { id = reader.get(RESOURCE_ID) val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - - numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT)) - + cpuCount = max(cpuCount, reader.getInt(RESOURCE_CPU_COUNT)) + cpuCapacity = max(cpuCapacity, reader.getDouble(RESOURCE_CPU_CAPACITY)) memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY)) + if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } @@ -165,7 +167,8 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { writer.set(RESOURCE_ID, id) writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) - writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) writer.endRow() } -- cgit v1.2.3