From 474044649a67cfcc857615b6a0f8387a2954abbd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Sep 2021 12:34:53 +0200 Subject: feat(trace): Update OpenDC VM trace format This change optimizes the OpenDC VM trace format by removing unnecessary columns as well as optimizing the writer settings. The new implementation still supports reading the old trace format in case users run OpenDC with older workload traces. --- .../compute/workload/ComputeWorkloadLoader.kt | 6 +- .../compute/workload/trace/TraceConverter.kt | 86 +++++++++++++-------- .../experiments/capelin/CapelinIntegrationTest.kt | 30 +++---- .../resources/trace/bitbrains-small/meta.parquet | Bin 2081 -> 2099 bytes .../resources/trace/bitbrains-small/trace.parquet | Bin 1647189 -> 1125930 bytes .../kotlin/org/opendc/trace/ResourceColumns.kt | 2 +- .../org/opendc/trace/ResourceStateColumns.kt | 2 +- .../opendc/trace/azure/AzureResourceStateTable.kt | 2 +- .../org/opendc/trace/azure/AzureResourceTable.kt | 2 +- .../opendc/trace/azure/AzureResourceTableReader.kt | 6 +- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 2 +- .../bitbrains/BitbrainsExResourceStateTable.kt | 4 +- .../BitbrainsExResourceStateTableReader.kt | 8 +- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 4 +- .../bitbrains/BitbrainsResourceStateTableReader.kt | 6 +- .../opendc/trace/opendc/OdcVmResourceStateTable.kt | 2 +- .../trace/opendc/OdcVmResourceStateTableReader.kt | 54 ++++++++++--- .../org/opendc/trace/opendc/OdcVmResourceTable.kt | 4 +- .../trace/opendc/OdcVmResourceTableReader.kt | 51 ++++++++++-- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 16 ++-- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 26 ++++--- .../src/test/resources/trace-v2.0/meta.parquet | Bin 0 -> 1582 bytes .../src/test/resources/trace-v2.0/trace.parquet | Bin 0 -> 83524 bytes .../src/test/resources/trace-v2.1/meta.parquet | Bin 0 -> 1679 bytes .../src/test/resources/trace-v2.1/trace.parquet | Bin 0 -> 65174 bytes .../src/test/resources/trace/meta.parquet | Bin 1582 -> 0 bytes .../src/test/resources/trace/trace.parquet | Bin 83524 -> 0 bytes traces/bitbrains-small/meta.parquet | Bin 2140 -> 2099 bytes traces/bitbrains-small/trace.parquet | Bin 1610917 -> 1125930 bytes 29 files changed, 203 insertions(+), 110 deletions(-) create mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet create mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet create mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet create mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet delete mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet delete mode 100644 opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index afc0fce9..c92b212f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -65,7 +65,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val id = reader.get(RESOURCE_STATE_ID) val time = reader.get(RESOURCE_STATE_TIMESTAMP) val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( @@ -75,7 +75,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { cores ) - fragments.getOrPut(id) { mutableListOf() }.add(fragment) + fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment) } fragments @@ -103,7 +103,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val submissionTime = reader.get(RESOURCE_START_TIME) val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_NCPUS) + val maxCores = reader.getInt(RESOURCE_CPU_COUNT) val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt index 50f3a669..2d570787 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workload.vm.trace +package org.opendc.compute.workload.trace import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument @@ -42,6 +42,7 @@ import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File import java.util.* +import kotlin.math.abs import kotlin.math.max import kotlin.math.min import kotlin.math.roundToLong @@ -112,16 +113,21 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val selectedVms = metaWriter.use { convertResources(trace, it) } + if (selectedVms.isEmpty()) { + logger.warn { "No VMs selected" } + return + } + logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) .withCompressionCodec(CompressionCodecName.ZSTD) - .enableDictionaryEncoding() - .enablePageWriteChecksum() + .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) .withBloomFilterNDV("id", selectedVms.size.toLong()) + .enableValidation() .build() val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } @@ -154,7 +160,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { @@ -172,10 +178,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) builder["id"] = id - builder["submissionTime"] = startTime - builder["endTime"] = stopTime - builder["maxCores"] = numCpus - builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + builder["start_time"] = startTime + builder["stop_time"] = stopTime + builder["cpu_count"] = numCpus + builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() logger.info { "Selecting VM $id" } @@ -194,44 +200,58 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var hasNextRow = reader.nextRow() var count = 0 + var lastId: String? = null + var lastTimestamp = 0L while (hasNextRow) { - var lastTimestamp = Long.MIN_VALUE + val id = reader.get(RESOURCE_STATE_ID) - do { - val id = reader.get(RESOURCE_STATE_ID) + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue + } - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } + val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - builder["id"] = id + val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + var timestamp = startTimestamp + var duration: Long - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L + // Check whether the previous entry is from a different VM + if (id != lastId) { + lastTimestamp = timestamp - 5 * 60 * 1000L + } + + do { + timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + + duration = timestamp - lastTimestamp + hasNextRow = reader.nextRow() + + if (!hasNextRow) { + break } - val duration = timestamp - lastTimestamp - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val flops = (cpuUsage * duration / 1000.0).roundToLong() + val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && + cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + } while (shouldContinue) - builder["time"] = timestamp - builder["duration"] = duration - builder["cores"] = cores - builder["cpuUsage"] = cpuUsage - builder["flops"] = flops + val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - writer.write(builder.build()) + builder["id"] = id + builder["timestamp"] = startTimestamp + builder["duration"] = duration + builder["cpu_count"] = cpuCount + builder["cpu_usage"] = cpuUsage - lastTimestamp = timestamp - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + writer.write(builder.build()) count++ + + lastId = id + lastTimestamp = timestamp } return count diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 140a84db..ac2ea646 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -117,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -161,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -210,10 +210,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -253,11 +253,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet index ee76d38f..da6e5330 100644 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet differ diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet index 9b1cde13..fe0a254c 100644 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet differ diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index e2e5ea6d..219002e0 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -47,7 +47,7 @@ public val RESOURCE_STOP_TIME: TableColumn = TableColumn("resource:stop * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_NCPUS: TableColumn = intColumn("resource:num_cpus") +public val RESOURCE_CPU_COUNT: TableColumn = intColumn("resource:cpu_count") /** * Memory capacity for the resource in KB. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index 1933967e..b683923b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -60,7 +60,7 @@ public val RESOURCE_STATE_POWERED_ON: TableColumn = booleanColumn("reso * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_STATE_NCPUS: TableColumn = intColumn("resource_state:ncpus") +public val RESOURCE_STATE_CPU_COUNT: TableColumn = intColumn("resource_state:cpu_count") /** * Total CPU capacity of the resource in MHz. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index 189ab52a..84c9b347 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -68,9 +68,9 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt index d9f6f156..96ee3158 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt @@ -38,7 +38,7 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, + RESOURCE_CPU_COUNT, RESOURCE_MEM_CAPACITY ) diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index d3970b07..5ea97483 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -67,7 +67,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -78,7 +78,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> id RESOURCE_START_TIME -> startTime RESOURCE_STOP_TIME -> stopTime - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -93,7 +93,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_NCPUS -> cpuCores + RESOURCE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 20375547..e5735f0d 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -87,7 +87,7 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, - { assertEquals(1, reader.getInt(RESOURCE_NCPUS)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index 4db2bace..4a60dff3 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -50,7 +50,7 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_CLUSTER_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -77,9 +77,9 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 6fe5d397..f1cf7307 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -81,7 +81,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB } } @@ -93,7 +93,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR RESOURCE_STATE_ID -> true RESOURCE_STATE_CLUSTER_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -111,7 +111,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR RESOURCE_STATE_ID -> id RESOURCE_STATE_CLUSTER_ID -> cluster RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) @@ -134,7 +134,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index c9e5954d..7241b18b 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -50,7 +50,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path override val columns: List> = listOf( RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -78,9 +78,9 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index dab784c2..56e66f5c 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -115,7 +115,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return when (column) { RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -133,7 +133,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, val res: Any? = when (column) { RESOURCE_STATE_ID -> partition RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct @@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 32a71052..bee4ba7e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 8850ad39..df3bcfa6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(column: TableColumn): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int + RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getDouble(column: TableColumn): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } override fun toString(): String = "OdcVmResourceStateTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + COL_DURATION = schema.getField("duration").pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var COL_ID = -1 + private var COL_TIMESTAMP = -1 + private var COL_DURATION = -1 + private var COL_CPU_COUNT = -1 + private var COL_CPU_USAGE = -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index 9927afee..b1456560 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, ) override fun newReader(): TableReader { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index fe4379e6..c52da62d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_ID -> record[COL_ID].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record["maxCores"] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB + RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader { format.open(URL(url.toString() + "help")) } @@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) @@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val table = format.open(url).getTable(TABLE_RESOURCE_STATES) assertNotNull(table) @@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest { @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertFalse(trace.containsTable("test")) assertNull(trace.getTable("test")) } - @Test - fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testResources(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() @@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest { reader.close() } - @Test - fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testSmoke(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet new file mode 100644 index 00000000..d6ff09d8 Binary files /dev/null and b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet differ diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet new file mode 100644 index 00000000..5b6fa6b7 Binary files /dev/null and b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet differ diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet new file mode 100644 index 00000000..d8184945 Binary files /dev/null and b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet differ diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet new file mode 100644 index 00000000..00ab5835 Binary files /dev/null and b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet differ diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet deleted file mode 100644 index d6ff09d8..00000000 Binary files a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet and /dev/null differ diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet deleted file mode 100644 index 5b6fa6b7..00000000 Binary files a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet and /dev/null differ diff --git a/traces/bitbrains-small/meta.parquet b/traces/bitbrains-small/meta.parquet index 43f51cb8..da6e5330 100644 Binary files a/traces/bitbrains-small/meta.parquet and b/traces/bitbrains-small/meta.parquet differ diff --git a/traces/bitbrains-small/trace.parquet b/traces/bitbrains-small/trace.parquet index f4dd5a50..fe0a254c 100644 Binary files a/traces/bitbrains-small/trace.parquet and b/traces/bitbrains-small/trace.parquet differ -- cgit v1.2.3