From ea5e79fc77072e6151ee7952581b97e35a2027fb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 22:54:08 +0200 Subject: perf(trace/opendc): Read records using low-level API This change updates the OpenDC VM format reader implementation to use the low-level record reading APIs provided by the `parquet-mr` library for improved performance. Previously, we used the `parquet-avro` library to read/write Avro records in Parquet format, but that library carries considerable overhead. --- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) (limited to 'opendc-trace/opendc-trace-opendc/src/test') diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index c8742624..dec0fef9 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.conv.* +import java.nio.file.Files import java.nio.file.Paths +import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -78,6 +80,37 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourcesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_START_TIME, Instant.EPOCH) + writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) + writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { @@ -94,6 +127,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourceStatesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCE_STATES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCE_STATES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") -- cgit v1.2.3