summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt54
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt51
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt16
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt26
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet)bin1582 -> 1582 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet (renamed from opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet)bin83524 -> 83524 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquetbin0 -> 65174 bytes
10 files changed, 113 insertions, 40 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
index 32a71052..bee4ba7e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt
@@ -38,7 +38,7 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table {
RESOURCE_STATE_ID,
RESOURCE_STATE_TIMESTAMP,
RESOURCE_STATE_DURATION,
- RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_COUNT,
RESOURCE_STATE_CPU_USAGE,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 8850ad39..df3bcfa6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,8 +38,20 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -47,7 +60,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
RESOURCE_STATE_ID -> true
RESOURCE_STATE_TIMESTAMP -> true
RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
+ RESOURCE_STATE_CPU_COUNT -> true
RESOURCE_STATE_CPU_USAGE -> true
else -> false
}
@@ -58,11 +71,11 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_STATE_ID -> record["id"].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record["time"] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record["duration"] as Long)
- RESOURCE_STATE_NCPUS -> record["cores"]
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_ID -> record[COL_ID].toString()
+ RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
+ RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
+ RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -76,9 +89,8 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(column: TableColumn<Int>): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
-
return when (column) {
- RESOURCE_STATE_NCPUS -> record["cores"] as Int
+ RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +102,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getDouble(column: TableColumn<Double>): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record["cpuUsage"] as Number).toDouble()
+ RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +112,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
override fun toString(): String = "OdcVmResourceStateTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ COL_DURATION = schema.getField("duration").pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_TIMESTAMP = -1
+ private var COL_DURATION = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_CPU_USAGE = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
index 9927afee..b1456560 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt
@@ -38,8 +38,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table {
RESOURCE_ID,
RESOURCE_START_TIME,
RESOURCE_STOP_TIME,
- RESOURCE_NCPUS,
- RESOURCE_MEM_CAPACITY
+ RESOURCE_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY,
)
override fun newReader(): TableReader {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index fe4379e6..c52da62d 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.opendc
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -36,8 +37,20 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
@@ -46,7 +59,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
RESOURCE_STOP_TIME -> true
- RESOURCE_NCPUS -> true
+ RESOURCE_CPU_COUNT -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
}
@@ -57,10 +70,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- RESOURCE_ID -> record["id"].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_ID -> record[COL_ID].toString()
+ RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
+ RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -77,7 +90,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_NCPUS -> record["maxCores"] as Int
+ RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -90,7 +103,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
+ RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -100,4 +113,26 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
}
override fun toString(): String = "OdcVmResourceTableReader"
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ COL_ID = schema.getField("id").pos()
+ COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema")
+ }
+ }
+
+ private var COL_ID = -1
+ private var COL_START_TIME = -1
+ private var COL_STOP_TIME = -1
+ private var COL_CPU_COUNT = -1
+ private var COL_MEM_CAPACITY = -1
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index b69b5edf..8edba725 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -25,6 +25,7 @@ package org.opendc.trace.opendc
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.opendc.trace.spi.TraceFormat
+import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.net.URL
import java.nio.file.Paths
import kotlin.io.path.exists
@@ -57,10 +58,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("submissionTime")
- .requiredLong("endTime")
- .requiredInt("maxCores")
- .requiredLong("requiredMemory")
+ .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
+ .requiredInt("cpu_count")
+ .requiredLong("mem_capacity")
.endRecord()
/**
@@ -72,11 +73,10 @@ public class OdcVmTraceFormat : TraceFormat {
.namespace("org.opendc.trace.opendc")
.fields()
.requiredString("id")
- .requiredLong("time")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
.requiredLong("duration")
- .requiredInt("cores")
- .requiredDouble("cpuUsage")
- .requiredLong("flops")
+ .requiredInt("cpu_count")
+ .requiredDouble("cpu_usage")
.endRecord()
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 21f1174c..42eb369e 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.*
import java.io.File
import java.net.URL
@@ -38,13 +40,13 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTraceExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertDoesNotThrow { format.open(url) }
}
@Test
fun testTraceDoesNotExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
assertThrows<IllegalArgumentException> {
format.open(URL(url.toString() + "help"))
}
@@ -52,7 +54,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTables() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
@@ -60,7 +62,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableExists() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val table = format.open(url).getTable(TABLE_RESOURCE_STATES)
assertNotNull(table)
@@ -69,16 +71,17 @@ internal class OdcVmTraceFormatTest {
@Test
fun testTableDoesNotExist() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ val url = File("src/test/resources/trace-v2.1").toURI().toURL()
val trace = format.open(url)
assertFalse(trace.containsTable("test"))
assertNull(trace.getTable("test"))
}
- @Test
- fun testResources() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testResources(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
@@ -98,9 +101,10 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
- @Test
- fun testSmoke() {
- val url = File("src/test/resources/trace").toURI().toURL()
+ @ParameterizedTest
+ @ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
+ fun testSmoke(name: String) {
+ val url = File("src/test/resources/$name").toURI().toURL()
val trace = format.open(url)
val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader()
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
index d6ff09d8..d6ff09d8 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/meta.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
index 5b6fa6b7..5b6fa6b7 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/resources/trace/trace.parquet
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.0/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/trace.parquet
Binary files differ