summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt40
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt38
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt21
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt39
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt2
22 files changed, 159 insertions, 39 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index b0181cbc..05d0234a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -42,9 +42,11 @@ public interface Table {
public val partitionKeys: List<TableColumn<*>>
/**
- * Open a [TableReader] for this table.
+ * Open a [TableReader] for a projection of this table.
+ *
+ * @param projection The list of columns to fetch from the table or `null` if no projection is performed.
*/
- public fun newReader(): TableReader
+ public fun newReader(projection: List<TableColumn<*>>? = null): TableReader
/**
* Open a [TableWriter] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index 24551edb..b848e19a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl
override val partitionKeys: List<TableColumn<*>>
get() = details.partitionKeys
- override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
+ override fun newReader(projection: List<TableColumn<*>>?): TableReader {
+ return trace.format.newReader(trace.path, name, projection)
+ }
override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index f2e610db..47761e0f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.spi
+import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import java.nio.file.Path
@@ -68,10 +69,11 @@ public interface TraceFormat {
*
* @param path The path to the trace to open.
* @param table The name of the table to open a [TableReader] for.
+ * @param projection The list of [TableColumn]s to project or `null` if no projection is performed.
* @throws IllegalArgumentException If [table] does not exist.
* @return A [TableReader] instance for the table.
*/
- public fun newReader(path: Path, table: String): TableReader
+ public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader
/**
* Open a [TableWriter] for the specified [table].
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 8e3e60cc..73978990 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 56f9a940..263d26ce 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -57,7 +57,7 @@ class AzureTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
@@ -71,7 +71,7 @@ class AzureTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 11d21a04..82e454ad 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index e1e7604a..a374e951 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val vms = Files.walk(path, 1)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index 77429e3e..c944cb98 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/vm.txt")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 9309beb1..841801e6 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 63688523..8d9eab82 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 9bf28ad7..411d45d0 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class GwfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -73,7 +73,7 @@ internal class GwfTraceFormatTest {
@Test
fun testReadingRowWithDependencies() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
// Move to row 7
for (x in 1..6)
@@ -85,7 +85,7 @@ internal class GwfTraceFormatTest {
{ assertEquals("7", reader.get(TASK_ID)) },
{ assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf<String>("4", "5", "6"), reader.get(TASK_PARENTS)) },
+ { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) },
)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index b455a2cf..d45910c6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport())
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport())
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 47cce914..0d70446d 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Resource] objects.
*/
-internal class ResourceReadSupport : ReadSupport<Resource>() {
+internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "submissionTime" to RESOURCE_START_TIME,
+ "start_time" to RESOURCE_START_TIME,
+ "endTime" to RESOURCE_STOP_TIME,
+ "stop_time" to RESOURCE_STOP_TIME,
+ "maxCores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpu_capacity" to RESOURCE_CPU_CAPACITY,
+ "requiredMemory" to RESOURCE_MEM_CAPACITY,
+ "mem_capacity" to RESOURCE_MEM_CAPACITY,
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 17840ceb..97aa00b2 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [ResourceState] objects.
*/
-internal class ResourceStateReadSupport : ReadSupport<ResourceState>() {
+internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "time" to RESOURCE_STATE_TIMESTAMP,
+ "timestamp" to RESOURCE_STATE_TIMESTAMP,
+ "duration" to RESOURCE_STATE_DURATION,
+ "cores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
+ "cpu_usage" to RESOURCE_STATE_CPU_USAGE,
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index dec0fef9..1f4f6195 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("1023", reader.get(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
@@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest {
writer.endRow()
writer.close()
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest {
writer.endRow()
writer.close()
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroupsEmpty() {
val path = Paths.get("src/test/resources/trace-v2.0")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS))
assertFalse(reader.nextRow())
reader.close()
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index b969f3ef..916a5eca 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 1698f644..c3d644e8 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class SwfTraceFormatTest {
@Test
fun testReader() {
val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index bc175b58..8db4c169 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 710de88e..4a8b2792 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -62,7 +62,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -89,7 +89,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReaderFull() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertDoesNotThrow {
while (reader.nextRow()) {
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index d6e42c8c..e71253ac 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport())
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 0017a4a9..8e7325de 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
*/
-internal class TaskReadSupport : ReadSupport<Task>() {
+internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap = mapOf<TableColumn<*>, String>(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id"
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0f0e422d..c0eb3f08 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -61,7 +61,7 @@ class WtfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/wtf-trace")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS))
assertAll(
{ assertTrue(reader.nextRow()) },