From 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 11:48:18 +0200 Subject: refactor(trace): Unify columns of different tables This change unifies columns of different tables used by trace formats. This concretely means that instead of having columns specific per table (e.g., RESOURCE_ID and RESOURCE_STATE_ID), with this changes these columns are shared between the tables with a single definition (RESOURCE_ID). --- .../kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt | 4 ++-- .../org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt | 10 +++++----- .../kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index bee4ba7e..39613070 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -35,10 +35,10 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { override val isSynthetic: Boolean = false override val columns: List> = listOf( - RESOURCE_STATE_ID, + RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_DURATION, - RESOURCE_STATE_CPU_COUNT, + RESOURCE_CPU_COUNT, RESOURCE_STATE_CPU_USAGE, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index df3bcfa6..e4b18735 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -57,10 +57,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun hasColumn(column: TableColumn<*>): Boolean { return when (column) { - RESOURCE_STATE_ID -> true + RESOURCE_ID -> true RESOURCE_STATE_TIMESTAMP -> true RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_CPU_COUNT -> true + RESOURCE_CPU_COUNT -> true RESOURCE_STATE_CPU_USAGE -> true else -> false } @@ -71,10 +71,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - RESOURCE_STATE_ID -> record[COL_ID].toString() + RESOURCE_ID -> record[COL_ID].toString() RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_STATE_CPU_COUNT -> getInt(RESOURCE_STATE_CPU_COUNT) + RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(column: TableColumn): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (column) { - RESOURCE_STATE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 42eb369e..9fb6028d 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -111,7 +111,7 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_STATE_ID)) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) -- cgit v1.2.3 From 768bfa0d2ae763e359d74612385ce43c41afb432 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:12:10 +0200 Subject: feat(trace): Support column lookup via index This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces. --- .../trace/opendc/OdcVmResourceStateTableReader.kt | 82 ++++++++++++---------- .../trace/opendc/OdcVmResourceTableReader.kt | 82 ++++++++++++---------- 2 files changed, 88 insertions(+), 76 deletions(-) (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index e4b18735..b5043f82 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun get(column: TableColumn): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) + COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) + COL_CPU_COUNT -> getInt(index) + COL_CPU_USAGE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() + return when (index) { + COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - COL_DURATION = schema.getField("duration").pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + AVRO_COL_DURATION = schema.getField("duration").pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema", e) } } - private var COL_ID = -1 - private var COL_TIMESTAMP = -1 - private var COL_DURATION = -1 - private var COL_CPU_COUNT = -1 - private var COL_CPU_USAGE = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_TIMESTAMP = -1 + private var AVRO_COL_DURATION = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_CPU_USAGE = -1 + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index c52da62d..d93929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun get(column: TableColumn): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) + COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() + return when (index) { + COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader Date: Mon, 20 Sep 2021 15:40:13 +0200 Subject: feat(trace): Add property for describing partition keys --- .../src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt | 2 ++ .../src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt | 2 ++ 2 files changed, 4 insertions(+) (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt index 39613070..caacf192 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceStateTable(private val path: Path) : Table { RESOURCE_STATE_CPU_USAGE, ) + override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("trace.parquet")) return OdcVmResourceStateTableReader(reader) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt index b1456560..653b28b8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt @@ -42,6 +42,8 @@ internal class OdcVmResourceTable(private val path: Path) : Table { RESOURCE_MEM_CAPACITY, ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("meta.parquet")) return OdcVmResourceTableReader(reader) -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../opendc/trace/opendc/OdcVmResourceStateTable.kt | 55 ---------------------- .../org/opendc/trace/opendc/OdcVmResourceTable.kt | 55 ---------------------- .../kotlin/org/opendc/trace/opendc/OdcVmTrace.kt | 49 ------------------- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 54 +++++++++++++++++---- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 47 +++++------------- 5 files changed, 55 insertions(+), 205 deletions(-) delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt delete mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt deleted file mode 100644 index caacf192..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource state [Table] in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCE_STATES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ) - - override val partitionKeys: List> = listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("trace.parquet")) - return OdcVmResourceStateTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt deleted file mode 100644 index 653b28b8..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTable.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.avro.generic.GenericRecord -import org.opendc.trace.* -import org.opendc.trace.util.parquet.LocalParquetReader -import java.nio.file.Path - -/** - * The resource [Table] for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTable(private val path: Path) : Table { - override val name: String = TABLE_RESOURCES - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY, - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - val reader = LocalParquetReader(path.resolve("meta.parquet")) - return OdcVmResourceTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Unknown partition $partition") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt deleted file mode 100644 index 3e5029b4..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTrace.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.opendc.trace.TABLE_RESOURCES -import org.opendc.trace.TABLE_RESOURCE_STATES -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * A [Trace] in the OpenDC virtual machine trace format. - */ -public class OdcVmTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - - override fun containsTable(name: String): Boolean = - name == TABLE_RESOURCES || name == TABLE_RESOURCE_STATES - - override fun getTable(name: String): Table? { - return when (name) { - TABLE_RESOURCES -> OdcVmResourceTable(path) - TABLE_RESOURCE_STATES -> OdcVmResourceStateTable(path) - else -> null - } - } - - override fun toString(): String = "OdcVmTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 8edba725..29818147 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -24,11 +24,13 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. @@ -39,13 +41,45 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" - /** - * Open a Bitbrains Parquet trace. - */ - override fun open(url: URL): OdcVmTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return OdcVmTrace(path) + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_RESOURCES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_CPU_COUNT, + RESOURCE_MEM_CAPACITY, + ) + ) + TABLE_RESOURCE_STATES -> TableDetails( + listOf( + RESOURCE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE, + ), + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader(path.resolve("meta.parquet")) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader(path.resolve("trace.parquet")) + OdcVmResourceStateTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } } public companion object { diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 9fb6028d..bfe0f881 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,8 +29,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.* -import java.io.File -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -38,53 +37,31 @@ import java.net.URL internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() - @Test - fun testTraceExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertDoesNotThrow { format.open(url) } - } - - @Test - fun testTraceDoesNotExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - assertThrows { - format.open(URL(url.toString() + "help")) - } - } - @Test fun testTables() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) + val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) } @Test fun testTableExists() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val table = format.open(url).getTable(TABLE_RESOURCE_STATES) + val path = Paths.get("src/test/resources/trace-v2.1") - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + assertDoesNotThrow { format.getDetails(path, TABLE_RESOURCE_STATES) } } @Test fun testTableDoesNotExist() { - val url = File("src/test/resources/trace-v2.1").toURI().toURL() - val trace = format.open(url) - - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + val path = Paths.get("src/test/resources/trace-v2.1") + assertThrows { format.getDetails(path, "test") } } @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCES) assertAll( { assertTrue(reader.nextRow()) }, @@ -104,10 +81,8 @@ internal class OdcVmTraceFormatTest { @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { - val url = File("src/test/resources/$name").toURI().toURL() - val trace = format.open(url) - - val reader = trace.getTable(TABLE_RESOURCE_STATES)!!.newReader() + val path = Paths.get("src/test/resources/$name") + val reader = format.newReader(path, TABLE_RESOURCE_STATES) assertAll( { assertTrue(reader.nextRow()) }, -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../trace/opendc/OdcVmResourceStateTableWriter.kt | 123 +++++++++++++++++++++ .../trace/opendc/OdcVmResourceTableWriter.kt | 106 ++++++++++++++++++ .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 43 +++++++ 3 files changed, 272 insertions(+) create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt new file mode 100644 index 00000000..15a8cb85 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Duration +import java.time.Instant + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableWriter( + private val writer: ParquetWriter, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource state schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + + val record = builder.build() + val id = record[COL_ID] as String + val timestamp = record[COL_TIMESTAMP] as Long + + check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + + writer.write(builder.build()) + + lastId = id + lastTimestamp = timestamp + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() + RESOURCE_STATE_DURATION -> schema.getField("duration").pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + + builder.set( + fields[index], + when (index) { + COL_TIMESTAMP -> (value as Instant).toEpochMilli() + COL_DURATION -> (value as Duration).toMillis() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: String? = null + private var lastTimestamp: Long = Long.MIN_VALUE + + /** + * Columns with special behavior. + */ + private val COL_ID = resolve(RESOURCE_ID) + private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) + private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt new file mode 100644 index 00000000..9cc6ca7d --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableWriter( + private val writer: ParquetWriter, + private val schema: Schema +) : TableWriter { + /** + * The current builder for the record that is being written. + */ + private var builder: GenericRecordBuilder? = null + + /** + * The fields belonging to the resource schema. + */ + private val fields = schema.fields + + override fun startRow() { + builder = GenericRecordBuilder(schema) + } + + override fun endRow() { + val builder = checkNotNull(builder) { "No active row" } + this.builder = null + writer.write(builder.build()) + } + + override fun resolve(column: TableColumn<*>): Int { + val schema = schema + return when (column) { + RESOURCE_ID -> schema.getField("id").pos() + RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + val builder = checkNotNull(builder) { "No active row" } + builder.set( + fields[index], + when (index) { + COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() + COL_MEM_CAPACITY -> (value as Double).roundToLong() + else -> value + } + ) + } + + override fun setBoolean(index: Int, value: Boolean) = set(index, value) + + override fun setInt(index: Int, value: Int) = set(index, value) + + override fun setLong(index: Int, value: Long) = set(index, value) + + override fun setDouble(index: Int, value: Double) = set(index, value) + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Columns with special behavior. + */ + private val COL_START_TIME = resolve(RESOURCE_START_TIME) + private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) + private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 29818147..9b32f8fd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -25,11 +25,16 @@ package org.opendc.trace.opendc import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import java.nio.file.Files import java.nio.file.Path /** @@ -41,6 +46,18 @@ public class OdcVmTraceFormat : TraceFormat { */ override val name: String = "opendc-vm" + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectory(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) override fun getDetails(path: Path, table: String): TableDetails { @@ -82,6 +99,32 @@ public class OdcVmTraceFormat : TraceFormat { } } + override fun newWriter(path: Path, table: String): TableWriter { + return when (table) { + TABLE_RESOURCES -> { + val schema = RESOURCES_SCHEMA + val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("meta.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceTableWriter(writer, schema) + } + TABLE_RESOURCE_STATES -> { + val schema = RESOURCE_STATES_SCHEMA + val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("trace.parquet"))) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceStateTableWriter(writer, schema) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + public companion object { /** * Schema for the resources table in the trace. -- cgit v1.2.3