From ea5e79fc77072e6151ee7952581b97e35a2027fb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 22:54:08 +0200 Subject: perf(trace/opendc): Read records using low-level API This change updates the OpenDC VM format reader implementation to use the low-level record reading APIs provided by the `parquet-mr` library for improved performance. Previously, we used the `parquet-avro` library to read/write Avro records in Parquet format, but that library carries considerable overhead. --- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) (limited to 'opendc-trace/opendc-trace-opendc/src/test') diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index c8742624..dec0fef9 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.conv.* +import java.nio.file.Files import java.nio.file.Paths +import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -78,6 +80,37 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourcesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_START_TIME, Instant.EPOCH) + writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) + writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { @@ -94,6 +127,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourceStatesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCE_STATES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCE_STATES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") -- cgit v1.2.3 From 670cd279ea7789e07b6d778a21fdec68347ab305 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 14:17:55 +0200 Subject: feat(trace/api): Add support for projecting tables This change adds support for projecting certain columns of a table. This enables faster reading for tables with high number of columns. Currently, we support projection in the Parquet-based workload formats. Other formats are text-based and will probably not benefit much from projection. --- .../org/opendc/trace/opendc/OdcVmTraceFormatTest.kt | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) (limited to 'opendc-trace/opendc-trace-opendc/src/test') diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index dec0fef9..1f4f6195 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, { assertEquals("1023", reader.get(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, @@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroupsEmpty() { val path = Paths.get("src/test/resources/trace-v2.0") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS)) assertFalse(reader.nextRow()) reader.close() -- cgit v1.2.3