diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-16 12:34:53 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-19 14:02:52 +0200 |
| commit | 474044649a67cfcc857615b6a0f8387a2954abbd (patch) | |
| tree | 5bb6bf9b0ca0978a47ac50ced85c245588465daa /opendc-trace/opendc-trace-opendc/src | |
| parent | 9b25eef67911d0aec6a36c82a34cd0e39b13b073 (diff) | |
feat(trace): Update OpenDC VM trace format
This change optimizes the OpenDC VM trace format by removing
unnecessary columns as well as optimizing the writer settings.
The new implementation still supports reading the old trace format in
case users run OpenDC with older workload traces.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
10 files changed, 113 insertions, 40 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 32a71052..bee4ba7e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 8850ad39..df3bcfa6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea RESOURCE_STATE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true + RESOURCE_STATE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record["id"].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long) - RESOURCE_STATE_NCPUS -> record["cores"] - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) + RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) + RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(column: TableColumn<Int>): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_NCPUS -> record["cores"] as Int + RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getDouble(column: TableColumn<Double>): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble() + RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } override fun toString(): String = "OdcVmResourceStateTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + COL_DURATION = schema.getField("duration").pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema", e) + } + } + + private var COL_ID = -1 + private var COL_TIMESTAMP = -1 + private var COL_DURATION = -1 + private var COL_CPU_COUNT = -1 + private var COL_CPU_USAGE = -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index 9927afee..b1456560 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_ID, RESOURCE_START_TIME, RESOURCE_STOP_TIME, - RESOURCE_NCPUS, - RESOURCE_MEM_CAPACITY + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, ) override fun newReader(): TableReader { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index fe4379e6..c52da62d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.trace.opendc +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader @@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G */ private var record: GenericRecord? = null + /** + * A flag to indicate that the columns have been initialized. + */ + private var hasInitializedColumns = false + override fun nextRow(): Boolean { - record = reader.read() + val record = reader.read() + this.record = record + + if (!hasInitializedColumns && record != null) { + initColumns(record.schema) + hasInitializedColumns = true + } + return record != null } @@ -46,7 +59,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G RESOURCE_ID -> true RESOURCE_START_TIME -> true RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true + RESOURCE_CPU_COUNT -> true RESOURCE_MEM_CAPACITY -> true else -> false } @@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_ID -> record["id"].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_ID -> record[COL_ID].toString() + RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_NCPUS -> record["maxCores"] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } @@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB + RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G } override fun toString(): String = "OdcVmResourceTableReader" + + /** + * Initialize the columns for the reader based on [schema]. + */ + private fun initColumns(schema: Schema) { + try { + COL_ID = schema.getField("id").pos() + COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + } catch (e: NullPointerException) { + // This happens when the field we are trying to access does not exist + throw IllegalArgumentException("Invalid schema") + } + } + + private var COL_ID = -1 + private var COL_START_TIME = -1 + private var COL_STOP_TIME = -1 + private var COL_CPU_COUNT = -1 + private var COL_MEM_CAPACITY = -1 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b69b5edf..8edba725 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -25,6 +25,7 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.net.URL import java.nio.file.Paths import kotlin.io.path.exists @@ -57,10 +58,10 @@ public class OdcVmTraceFormat : TraceFormat { .namespace("org.opendc.trace.opendc") .fields() .requiredString("id") - .requiredLong("submissionTime") - .requiredLong("endTime") - .requiredInt("maxCores") - .requiredLong("requiredMemory") + .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() + .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() + .requiredInt("cpu_count") + .requiredLong("mem_capacity") .endRecord() /** @@ -72,11 +73,10 @@ public class OdcVmTraceFormat : TraceFormat { .namespace("org.opendc.trace.opendc") .fields() .requiredString("id") - .requiredLong("time") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() .requiredLong("duration") - .requiredInt("cores") - .requiredDouble("cpuUsage") - .requiredLong("flops") + .requiredInt("cpu_count") + .requiredDouble("cpu_usage") .endRecord() } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 21f1174c..42eb369e 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -26,6 +26,8 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* import java.io.File import java.net.URL @@ -38,13 +40,13 @@ internal class OdcVmTraceFormatTest { @Test fun testTraceExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() assertDoesNotThrow { format.open(url) } } @Test fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() assertThrows<IllegalArgumentException> { format.open(URL(url.toString() + "help")) } @@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTables() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) @@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest { @Test fun testTableExists() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val table = format.open(url).getTable(TABLE_RESOURCE_STATES) assertNotNull(table) @@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest { @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace").toURI().toURL() + val url = File("src/test/resources/trace-v2.1").toURI().toURL() val trace = format.open(url) assertFalse(trace.containsTable("test")) assertNull(trace.getTable("test")) } - @Test - fun testResources() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testResources(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() @@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest { reader.close() } - @Test - fun testSmoke() { - val url = File("src/test/resources/trace").toURI().toURL() + @ParameterizedTest + @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) + fun testSmoke(name: String) { + val url = File("src/test/resources/$name").toURI().toURL() val trace = format.open(url) val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet Binary files differindex d6ff09d8..d6ff09d8 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet Binary files differindex 5b6fa6b7..5b6fa6b7 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet Binary files differnew file mode 100644 index 00000000..d8184945 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet Binary files differnew file mode 100644 index 00000000..00ab5835 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet |
