summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-16 12:34:53 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:52 +0200
commit474044649a67cfcc857615b6a0f8387a2954abbd (patch)
tree5bb6bf9b0ca0978a47ac50ced85c245588465daa /opendc-trace
parent9b25eef67911d0aec6a36c82a34cd0e39b13b073 (diff)
feat(trace): Update OpenDC VM trace format
This change optimizes the OpenDC VM trace format by removing unnecessary columns as well as optimizing the writer settings. The new implementation still supports reading the old trace format in case users run OpenDC with older workload traces.
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt6
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt8
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt54
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt51
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt16
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt26
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet)bin1582 -> 1582 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet)bin83524 -> 83524 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquetbin0 -> 65174 bytes
20 files changed, 132 insertions, 59 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index e2e5ea6d..219002e0 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -47,7 +47,7 @@ public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count")
/**
* Memory capacity for the resource in KB.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
index 1933967e..b683923b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -60,7 +60,7 @@ public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("reso
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_STATE_NCPUS: TableColumn<Int> = intColumn("resource_state:ncpus")
+public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count")
/**
* Total CPU capacity of the resource in MHz.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
index 189ab52a..84c9b347 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
@@ -68,9 +68,9 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
index d9f6f156..96ee3158 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTable.kt
@@ -38,7 +38,7 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
+ RESOURCE_CPU_COUNT,
RESOURCE_MEM_CAPACITY
)
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index d3970b07..5ea97483 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -67,7 +67,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -78,7 +78,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
RESOURCE_ID -> id
RESOURCE_START_TIME -> startTime
RESOURCE_STOP_TIME -> stopTime
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -93,7 +93,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_NCPUS -> cpuCores
+ RESOURCE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 20375547..e5735f0d 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -87,7 +87,7 @@ class AzureTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
- { assertEquals(1, reader.getInt(RESOURCE_NCPUS)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
index 4db2bace..4a60dff3 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
@@ -50,7 +50,7 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_CLUSTER_ID,
RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_CAPACITY,
RESOURCE_STATE_CPU_USAGE,
RESOURCE_STATE_CPU_USAGE_PCT,
@@ -77,9 +77,9 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table {
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index 6fe5d397..f1cf7307 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -81,7 +81,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
COL_ID -> id = field.trim()
- COL_MEM_CAPACITY -> memCapacity = field.toDouble()
+ COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
}
}
@@ -93,7 +93,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
RESOURCE_STATE_ID -> true
RESOURCE_STATE_CLUSTER_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_CAPACITY -> true
RESOURCE_STATE_CPU_USAGE -> true
RESOURCE_STATE_CPU_USAGE_PCT -> true
@@ -111,7 +111,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
RESOURCE_STATE_ID -> id
RESOURCE_STATE_CLUSTER_ID -> cluster
RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
@@ -134,7 +134,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index c9e5954d..7241b18b 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -50,7 +50,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path
override val columns: List<TableColumn<*>> = listOf(
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_CAPACITY,
RESOURCE_STATE_CPU_USAGE,
RESOURCE_STATE_CPU_USAGE_PCT,
@@ -78,9 +78,9 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path
delegate.close()
delegate = nextDelegate()
+ this.delegate = delegate
}
- this.delegate = delegate
return delegate != null
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index dab784c2..56e66f5c 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -115,7 +115,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return when (column) {
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_CAPACITY -> true
RESOURCE_STATE_CPU_USAGE -> true
RESOURCE_STATE_CPU_USAGE_PCT -> true
@@ -133,7 +133,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
RESOURCE_STATE_CPU_USAGE -> cpuUsage
RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
@@ -156,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
index 32a71052..bee4ba7e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
@@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_USAGE,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 8850ad39..df3bcfa6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_USAGE -> true
else -> false
}
@@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_ID -> record[COL_ID].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(column: TableColumn<Int>): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
-
return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getDouble(column: TableColumn<Double>): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
override fun toString(): String = "OdcVmResourceStateTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ COL_DURATION = schema.getField("duration").pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_TIMESTAMP = -1
+ private var COL_DURATION = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_CPU_USAGE = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
index 9927afee..b1456560 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
@@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table {
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY,
)
override fun newReader(): TableReader {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index fe4379e6..c52da62d 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -46,7 +59,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_ID -> record[COL_ID].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
+ RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
+ RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
}
override fun toString(): String = "OdcVmResourceTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema")
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_START_TIME = -1
+ private var COL_STOP_TIME = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_MEM_CAPACITY = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index b69b5edf..8edba725 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.net.URL
import java.nio.file.Paths
import kotlin.io.path.exists
@@ -57,10 +58,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
+ .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("cpu_count")
+ .requiredLong("mem_capacity")
.endRecord()
/**
@@ -72,11 +73,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("time")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
.requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_usage")
.endRecord()
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 21f1174c..42eb369e 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.*
import java.io.File
import java.net.URL
@@ -38,13 +40,13 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTraceExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertDoesNotThrow { format.open(url) }
}
@Test
fun testTraceDoesNotExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertThrows<IllegalArgumentException> {
format.open(URL(url.toString() + "help"))
}
@@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTables() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
@@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
assertNotNull(table)
@@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableDoesNotExist() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertFalse(trace.containsTable("test"))
assertNull(trace.getTable("test"))
}
- @Test
- fun testResources() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testResources(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
@@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
- @Test
- fun testSmoke() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testSmoke(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
index d6ff09d8..d6ff09d8 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
index 5b6fa6b7..5b6fa6b7 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
Binary files differ