From 670cd279ea7789e07b6d778a21fdec68347ab305 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 14:17:55 +0200 Subject: feat(trace/api): Add support for projecting tables This change adds support for projecting certain columns of a table. This enables faster reading for tables with high number of columns. Currently, we support projection in the Parquet-based workload formats. Other formats are text-based and will probably not benefit much from projection. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 6 ++-- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 4 ++- .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 4 ++- .../org/opendc/trace/azure/AzureTraceFormat.kt | 2 +- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 4 +-- .../trace/bitbrains/BitbrainsExTraceFormat.kt | 2 +- .../opendc/trace/bitbrains/BitbrainsTraceFormat.kt | 2 +- .../trace/bitbrains/BitbrainsExTraceFormatTest.kt | 2 +- .../trace/bitbrains/BitbrainsTraceFormatTest.kt | 4 +-- .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 2 +- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 6 ++-- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 6 ++-- .../trace/opendc/parquet/ResourceReadSupport.kt | 40 ++++++++++++++++++++-- .../opendc/parquet/ResourceStateReadSupport.kt | 38 ++++++++++++++++++-- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 21 ++++++++---- .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 2 +- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 2 +- .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 2 +- .../trace/wfformat/WfFormatTraceFormatTest.kt | 4 +-- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 4 +-- .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 39 +++++++++++++++++++-- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 2 +- 22 files changed, 159 insertions(+), 39 deletions(-) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index b0181cbc..05d0234a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -42,9 +42,11 @@ public interface Table { public val partitionKeys: List> /** - * Open a [TableReader] for this table. + * Open a [TableReader] for a projection of this table. + * + * @param projection The list of columns to fetch from the table or `null` if no projection is performed. */ - public fun newReader(): TableReader + public fun newReader(projection: List>? = null): TableReader /** * Open a [TableWriter] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index 24551edb..b848e19a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override val partitionKeys: List> get() = details.partitionKeys - override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newReader(projection: List>?): TableReader { + return trace.format.newReader(trace.path, name, projection) + } override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index f2e610db..47761e0f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,6 +22,7 @@ package org.opendc.trace.spi +import org.opendc.trace.TableColumn import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import java.nio.file.Path @@ -68,10 +69,11 @@ public interface TraceFormat { * * @param path The path to the trace to open. * @param table The name of the table to open a [TableReader] for. + * @param projection The list of [TableColumn]s to project or `null` if no projection is performed. * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String): TableReader + public fun newReader(path: Path, table: String, projection: List>?): TableReader /** * Open a [TableWriter] for the specified [table]. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 8e3e60cc..73978990 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 56f9a940..263d26ce 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -57,7 +57,7 @@ class AzureTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -71,7 +71,7 @@ class AzureTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 11d21a04..82e454ad 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index e1e7604a..a374e951 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { val vms = Files.walk(path, 1) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 77429e3e..c944cb98 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/vm.txt") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 9309beb1..841801e6 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 63688523..8d9eab82 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 9bf28ad7..411d45d0 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class GwfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -73,7 +73,7 @@ internal class GwfTraceFormatTest { @Test fun testReadingRowWithDependencies() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) // Move to row 7 for (x in 1..6) @@ -85,7 +85,7 @@ internal class GwfTraceFormatTest { { assertEquals("7", reader.get(TASK_ID)) }, { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, ) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b455a2cf..d45910c6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 47cce914..0d70446d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport : ReadSupport() { +internal class ResourceReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf>( + "id" to RESOURCE_ID, + "submissionTime" to RESOURCE_START_TIME, + "start_time" to RESOURCE_START_TIME, + "endTime" to RESOURCE_STOP_TIME, + "stop_time" to RESOURCE_STOP_TIME, + "maxCores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpu_capacity" to RESOURCE_CPU_CAPACITY, + "requiredMemory" to RESOURCE_MEM_CAPACITY, + "mem_capacity" to RESOURCE_MEM_CAPACITY, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 17840ceb..97aa00b2 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport : ReadSupport() { +internal class ResourceStateReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf>( + "id" to RESOURCE_ID, + "time" to RESOURCE_STATE_TIMESTAMP, + "timestamp" to RESOURCE_STATE_TIMESTAMP, + "duration" to RESOURCE_STATE_DURATION, + "cores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpuUsage" to RESOURCE_STATE_CPU_USAGE, + "cpu_usage" to RESOURCE_STATE_CPU_USAGE, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index dec0fef9..1f4f6195 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, { assertEquals("1023", reader.get(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, @@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroupsEmpty() { val path = Paths.get("src/test/resources/trace-v2.0") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index b969f3ef..916a5eca 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 1698f644..c3d644e8 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class SwfTraceFormatTest { @Test fun testReader() { val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index bc175b58..8db4c169 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 710de88e..4a8b2792 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -62,7 +62,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -89,7 +89,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReaderFull() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertDoesNotThrow { while (reader.nextRow()) { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index d6e42c8c..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 0017a4a9..8e7325de 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. */ -internal class TaskReadSupport : ReadSupport() { +internal class TaskReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, -- cgit v1.2.3