summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-16 12:34:53 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:52 +0200
commit474044649a67cfcc857615b6a0f8387a2954abbd (patch)
tree5bb6bf9b0ca0978a47ac50ced85c245588465daa /opendc-trace/opendc-trace-opendc
parent9b25eef67911d0aec6a36c82a34cd0e39b13b073 (diff)
feat(trace): Update OpenDC VM trace format
This change optimizes the OpenDC VM trace format by removing unnecessary columns as well as optimizing the writer settings. The new implementation still supports reading the old trace format in case users run OpenDC with older workload traces.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt54
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt51
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt16
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt26
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet)bin1582 -> 1582 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet)bin83524 -> 83524 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquetbin0 -> 65174 bytes
10 files changed, 113 insertions, 40 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
index 32a71052..bee4ba7e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
@@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_USAGE,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 8850ad39..df3bcfa6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_USAGE -> true
else -> false
}
@@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_ID -> record[COL_ID].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(column: TableColumn<Int>): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
-
return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getDouble(column: TableColumn<Double>): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
override fun toString(): String = "OdcVmResourceStateTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ COL_DURATION = schema.getField("duration").pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_TIMESTAMP = -1
+ private var COL_DURATION = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_CPU_USAGE = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
index 9927afee..b1456560 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
@@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table {
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY,
)
override fun newReader(): TableReader {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index fe4379e6..c52da62d 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -46,7 +59,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_ID -> record[COL_ID].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
+ RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
+ RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
}
override fun toString(): String = "OdcVmResourceTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema")
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_START_TIME = -1
+ private var COL_STOP_TIME = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_MEM_CAPACITY = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index b69b5edf..8edba725 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.net.URL
import java.nio.file.Paths
import kotlin.io.path.exists
@@ -57,10 +58,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
+ .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("cpu_count")
+ .requiredLong("mem_capacity")
.endRecord()
/**
@@ -72,11 +73,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("time")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
.requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_usage")
.endRecord()
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 21f1174c..42eb369e 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.*
import java.io.File
import java.net.URL
@@ -38,13 +40,13 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTraceExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertDoesNotThrow { format.open(url) }
}
@Test
fun testTraceDoesNotExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertThrows<IllegalArgumentException> {
format.open(URL(url.toString() + "help"))
}
@@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTables() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
@@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
assertNotNull(table)
@@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableDoesNotExist() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertFalse(trace.containsTable("test"))
assertNull(trace.getTable("test"))
}
- @Test
- fun testResources() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testResources(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
@@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
- @Test
- fun testSmoke() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testSmoke(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
index d6ff09d8..d6ff09d8 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
index 5b6fa6b7..5b6fa6b7 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
Binary files differ