diff options
Diffstat (limited to 'opendc-trace')
20 files changed, 132 insertions, 59 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index e2e5ea6d..219002e0 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -47,7 +47,7 @@ public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count") /** * Memory capacity for the resource in KB. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index 1933967e..b683923b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -60,7 +60,7 @@ public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("reso * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus") +public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count") /** * Total CPU capacity of the resource in MHz. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index 189ab52a..84c9b347 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -68,9 +68,9 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt index d9f6f156..96ee3158 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt @@ -38,7 +38,7 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, + RESOURCE_CPU_COUNT, RESOURCE_MEM_CAPACITY ) diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index d3970b07..5ea97483 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -67,7 +67,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -78,7 +78,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe RESOURCE_ID -> id RESOURCE_START_TIME -> startTime RESOURCE_STOP_TIME -> stopTime - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -93,7 +93,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_NCPUS -> cpuCores + RESOURCE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 20375547..e5735f0d 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -87,7 +87,7 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, - { assertEquals(1, reader.getInt(RESOURCE_NCPUS)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt index 4db2bace..4a60dff3 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt @@ -50,7 +50,7 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_CLUSTER_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -77,9 +77,9 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table { delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 6fe5d397..f1cf7307 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -81,7 +81,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB } } @@ -93,7 +93,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR RESOURCE_STATE_ID -> true RESOURCE_STATE_CLUSTER_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -111,7 +111,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR RESOURCE_STATE_ID -> id RESOURCE_STATE_CLUSTER_ID -> cluster RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) @@ -134,7 +134,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index c9e5954d..7241b18b 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -50,7 +50,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path override val columns: List<TableColumn<*>> = listOf( RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_CAPACITY, RESOURCE_STATE_CPU_USAGE, RESOURCE_STATE_CPU_USAGE_PCT, @@ -78,9 +78,9 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path delegate.close() delegate = nextDelegate() + this.delegate = delegate } - this.delegate = delegate return delegate != null } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index dab784c2..56e66f5c 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -115,7 +115,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return when (column) { RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_CAPACITY -> true RESOURCE_STATE_CPU_USAGE -> true RESOURCE_STATE_CPU_USAGE_PCT -> true @@ -133,7 +133,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, val res: Any? = when (column) { RESOURCE_STATE_ID -> partition RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity RESOURCE_STATE_CPU_USAGE -> cpuUsage RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct @@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 32a71052..bee4ba7e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 8850ad39..df3bcfa6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(column: TableColumn<Int>): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int + RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getDouble(column: TableColumn<Double>): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } override fun toString(): String = "OdcVmResourceStateTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + COL_DURATION = schema.getField("duration").pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var COL_ID = -1 + private var COL_TIMESTAMP = -1 + private var COL_DURATION = -1 + private var COL_CPU_COUNT = -1 + private var COL_CPU_USAGE = -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index 9927afee..b1456560 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, ) override fun newReader(): TableReader { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index fe4379e6..c52da62d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -46,7 +59,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_ID -> record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_ID -> record[COL_ID].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_NCPUS -> record["maxCores"] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB + RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G } override fun toString(): String = "OdcVmResourceTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema") + } + } + + private var COL_ID = -1 + private var COL_START_TIME = -1 + private var COL_STOP_TIME = -1 + private var COL_CPU_COUNT = -1 + private var COL_MEM_CAPACITY = -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b69b5edf..8edba725 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -25,6 +25,7 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.net.URL import java.nio.file.Paths import kotlin.io.path.exists @@ -57,10 +58,10 @@ public class OdcVmTraceFormat : TraceFormat { .namespace("org.opendc.trace.opendc") .fields() .requiredString("id") - .requiredLong("submissionTime") - .requiredLong("endTime") - .requiredInt("maxCores") - .requiredLong("requiredMemory") + .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() + .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("cpu_count") + .requiredLong("mem_capacity") .endRecord() /** @@ -72,11 +73,10 @@ public class OdcVmTraceFormat : TraceFormat { .namespace("org.opendc.trace.opendc") .fields() .requiredString("id") - .requiredLong("time") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() .requiredLong("duration") - .requiredInt("cores") - .requiredDouble("cpuUsage") - .requiredLong("flops") + .requiredInt("cpu_count") + .requiredDouble("cpu_usage") .endRecord() } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 21f1174c..42eb369e 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* import java.io.File import java.net.URL @@ -38,13 +40,13 @@ internal class OdcVmTraceFormatTest { @Test fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() assertDoesNotThrow { format.open(url) } } @Test fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() assertThrows<IllegalArgumentException> { format.open(URL(url.toString() + "help")) } @@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) @@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val table = format.open(url).getTable(TABLE_RESOURCE_STATES) assertNotNull(table) @@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest { @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertFalse(trace.containsTable("test")) assertNull(trace.getTable("test")) } - @Test - fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testResources(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() @@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest { reader.close() } - @Test - fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testSmoke(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet Binary files differindex d6ff09d8..d6ff09d8 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet Binary files differindex 5b6fa6b7..5b6fa6b7 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet Binary files differnew file mode 100644 index 00000000..d8184945 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet Binary files differnew file mode 100644 index 00000000..00ab5835 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet |
