summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt40
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt38
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt21
4 files changed, 92 insertions, 13 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index b455a2cf..d45910c6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport())
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport())
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 47cce914..0d70446d 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Resource] objects.
*/
-internal class ResourceReadSupport : ReadSupport<Resource>() {
+internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "submissionTime" to RESOURCE_START_TIME,
+ "start_time" to RESOURCE_START_TIME,
+ "endTime" to RESOURCE_STOP_TIME,
+ "stop_time" to RESOURCE_STOP_TIME,
+ "maxCores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpu_capacity" to RESOURCE_CPU_CAPACITY,
+ "requiredMemory" to RESOURCE_MEM_CAPACITY,
+ "mem_capacity" to RESOURCE_MEM_CAPACITY,
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 17840ceb..97aa00b2 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [ResourceState] objects.
*/
-internal class ResourceStateReadSupport : ReadSupport<ResourceState>() {
+internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "time" to RESOURCE_STATE_TIMESTAMP,
+ "timestamp" to RESOURCE_STATE_TIMESTAMP,
+ "duration" to RESOURCE_STATE_DURATION,
+ "cores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
+ "cpu_usage" to RESOURCE_STATE_CPU_USAGE,
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index dec0fef9..1f4f6195 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("1023", reader.get(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
@@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest {
writer.endRow()
writer.close()
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest {
writer.endRow()
writer.close()
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroupsEmpty() {
val path = Paths.get("src/test/resources/trace-v2.0")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS))
assertFalse(reader.nextRow())
reader.close()