From 412d8d597511122f114d69a4ba64c6b55dd192f9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 29 Apr 2022 12:09:51 +0200 Subject: feat(trace/calcite): Add Calcite (SQL) integration This change adds support for querying workload trace formats implemented using the OpenDC API through Apache Calcite. This allows users to write SQL queries to explore the workload traces. --- gradle/libs.versions.toml | 4 + .../main/kotlin/org/opendc/trace/TableColumn.kt | 2 +- .../opendc/trace/conv/InterferenceGroupColumns.kt | 7 +- .../org/opendc/trace/conv/ResourceColumns.kt | 15 ++- .../org/opendc/trace/conv/ResourceStateColumns.kt | 25 ++--- .../kotlin/org/opendc/trace/conv/TableColumns.kt | 4 +- .../kotlin/org/opendc/trace/conv/TaskColumns.kt | 25 ++--- opendc-trace/opendc-trace-calcite/build.gradle.kts | 37 +++++++ .../opendc/trace/calcite/TraceReaderEnumerator.kt | 93 ++++++++++++++++ .../kotlin/org/opendc/trace/calcite/TraceSchema.kt | 45 ++++++++ .../org/opendc/trace/calcite/TraceSchemaFactory.kt | 50 +++++++++ .../kotlin/org/opendc/trace/calcite/TraceTable.kt | 84 ++++++++++++++ .../kotlin/org/opendc/trace/calcite/CalciteTest.kt | 121 +++++++++++++++++++++ .../opendc/trace/calcite/TraceSchemaFactoryTest.kt | 78 +++++++++++++ .../test/resources/trace/interference-model.json | 20 ++++ .../src/test/resources/trace/meta.parquet | Bin 0 -> 1679 bytes .../src/test/resources/trace/trace.parquet | Bin 0 -> 65174 bytes settings.gradle.kts | 1 + 18 files changed, 571 insertions(+), 40 deletions(-) create mode 100644 opendc-trace/opendc-trace-calcite/build.gradle.kts create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json create mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet create mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 43568067..458ac973 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,4 +1,5 @@ [versions] +calcite = "1.30.0" classgraph = "4.8.143" clikt = "3.4.1" config = "1.4.2" @@ -101,6 +102,9 @@ quarkus-test-security = { module = "io.quarkus:quarkus-test-security" } restassured-core = { module = "io.rest-assured:rest-assured" } restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" } +# Calcite (SQL) +calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" } + # Other classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" } jakarta-validation = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakarta-validation" } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index 776c40c0..b77a2982 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -33,7 +33,7 @@ public class TableColumn(public val name: String, type: Class) { /** * The type of the column. */ - private val type: Class<*> = type + public val type: Class<*> = type /** * Determine whether the type of the column is a subtype of [column]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 532f6d24..5e8859e4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -24,22 +24,21 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column /** * Members of the interference group. */ @JvmField -public val INTERFERENCE_GROUP_MEMBERS: TableColumn> = column("interference_group:members") +public val INTERFERENCE_GROUP_MEMBERS: TableColumn> = column("members") /** * Target load after which the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_TARGET: TableColumn = column("interference_group:target") +public val INTERFERENCE_GROUP_TARGET: TableColumn = column("target") /** * Performance score when the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_SCORE: TableColumn = column("interference_group:score") +public val INTERFERENCE_GROUP_SCORE: TableColumn = column("score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index e9fc5d44..e602e534 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -24,47 +24,46 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Instant /** * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn = column("resource:id") +public val RESOURCE_ID: TableColumn = column("id") /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: TableColumn = column("resource:cluster_id") +public val RESOURCE_CLUSTER_ID: TableColumn = column("cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn = column("resource:start_time") +public val RESOURCE_START_TIME: TableColumn = column("start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn = column("resource:stop_time") +public val RESOURCE_STOP_TIME: TableColumn = column("stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn = column("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn = column("cpu_count") /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: TableColumn = column("resource:cpu_capacity") +public val RESOURCE_CPU_CAPACITY: TableColumn = column("cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn = column("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn = column("mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index d5bbafd7..3a44f817 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("resource_state:timestamp") +public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn = column("resource_state:duration") +public val RESOURCE_STATE_DURATION: TableColumn = column("duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn = column("resource_state:powered_on") +public val RESOURCE_STATE_POWERED_ON: TableColumn = column("powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn = column("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn = column("cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = column("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = column("cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn = column("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn = column("cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = column("resource_state:cpu_ready_pct") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = column("cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn = column("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn = column("mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn = column("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn = column("disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn = column("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn = column("disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn = column("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn = column("net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn = column("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn = column("net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt index 31a58360..a58505e9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt @@ -21,7 +21,9 @@ */ @file:JvmName("TableColumns") -package org.opendc.trace +package org.opendc.trace.conv + +import org.opendc.trace.TableColumn /** * Construct a [TableColumn] with the specified [name] and type [T]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 397c0794..e6daafb7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn = column("task:id") +public val TASK_ID: TableColumn = column("id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn = column("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn = column("workflow_id") /** * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn = column("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn = column("submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn = column("task:wait_time") +public val TASK_WAIT_TIME: TableColumn = column("wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn = column("task:runtime") +public val TASK_RUNTIME: TableColumn = column("runtime") /** * A column containing the parents of a task. */ @JvmField -public val TASK_PARENTS: TableColumn> = column("task:parents") +public val TASK_PARENTS: TableColumn> = column("parents") /** * A column containing the children of a task. */ @JvmField -public val TASK_CHILDREN: TableColumn> = column("task:children") +public val TASK_CHILDREN: TableColumn> = column("children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn = column("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn = column("req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn = column("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn = column("alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn = column("task:status") +public val TASK_STATUS: TableColumn = column("status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn = column("task:group_id") +public val TASK_GROUP_ID: TableColumn = column("group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn = column("task:user_id") +public val TASK_USER_ID: TableColumn = column("user_id") diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts new file mode 100644 index 00000000..2ffdac3c --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Apache Calcite (SQL) integration for the OpenDC trace library" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcTrace.opendcTraceApi) + + api(libs.calcite.core) + + testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc) + testRuntimeOnly(libs.slf4j.simple) +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt new file mode 100644 index 00000000..1854f262 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerator +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An [Enumerator] for a [TableReader]. + */ +internal class TraceReaderEnumerator( + private val reader: TableReader, + private val columns: List>, + private val cancelFlag: AtomicBoolean +) : Enumerator { + private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private var current: E? = null + + override fun moveNext(): Boolean { + if (cancelFlag.get()) { + return false + } + + val reader = reader + val res = reader.nextRow() + + if (res) { + @Suppress("UNCHECKED_CAST") + current = convertRow(reader) as E + } else { + current = null + } + + return res + } + + override fun current(): E = checkNotNull(current) + + override fun reset() { + throw UnsupportedOperationException() + } + + override fun close() { + reader.close() + } + + private fun convertRow(reader: TableReader): Array { + val res = arrayOfNulls(columns.size) + val columnIndices = columnIndices + + for ((index, column) in columns.withIndex()) { + val columnIndex = columnIndices[index] + res[index] = convertColumn(reader, column, columnIndex) + } + return res + } + + private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { + val value = reader.get(columnIndex) + + return when (column.type) { + Instant::class.java -> Timestamp.from(value as Instant) + Duration::class.java -> (value as Duration).toMillis() + Set::class.java -> (value as Set<*>).toTypedArray() + else -> value + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt new file mode 100644 index 00000000..298a59dc --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractSchema +import org.opendc.trace.Trace + +/** + * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. + * + * @param trace The [Trace] to create a schema for. + */ +public class TraceSchema(private val trace: Trace) : AbstractSchema() { + + private val tables: Map by lazy { + trace.tables.associateWith { + val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } + TraceTable(table) + } + } + + override fun getTableMap(): Map = tables +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt new file mode 100644 index 00000000..3c6badc8 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.model.ModelHandler +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.SchemaFactory +import org.apache.calcite.schema.SchemaPlus +import org.opendc.trace.Trace +import java.io.File +import java.nio.file.Paths + +/** + * Factory that creates a [TraceSchema]. + * + * This factory allows users to include a schema that references a trace in a `model.json` file. + */ +public class TraceSchemaFactory : SchemaFactory { + override fun create(parentSchema: SchemaPlus, name: String, operand: Map): Schema { + val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? + val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String + val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) + + val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String + val create = operand.getOrDefault("create", false) as Boolean + + val trace = if (create) Trace.create(path, format) else Trace.open(path, format) + return TraceSchema(trace) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt new file mode 100644 index 00000000..8c3fe4e2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.AbstractEnumerable +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.Enumerator +import org.apache.calcite.rel.type.RelDataType +import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.schema.ScannableTable +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.sql.type.SqlTypeName +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. + */ +internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable { + + private var rowType: RelDataType? = null + + override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { + var rowType = rowType + if (rowType == null) { + rowType = deduceRowType(typeFactory as JavaTypeFactory) + this.rowType = rowType + } + + return rowType + } + + override fun scan(root: DataContext): Enumerable> { + val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) + return object : AbstractEnumerable>() { + override fun enumerator(): Enumerator> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + } + } + + override fun toString(): String = "TraceTable" + + private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { + val types = mutableListOf() + val names = mutableListOf() + + for (column in table.columns) { + names.add(column.name) + types.add( + when (column.type) { + Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1) + else -> typeFactory.createType(column.type) + } + ) + } + + return typeFactory.createStructType(types, names) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt new file mode 100644 index 00000000..f0e461e0 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.jdbc.CalciteConnection +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.opendc.trace.Trace +import java.nio.file.Paths +import java.sql.DriverManager +import java.sql.ResultSet +import java.sql.Timestamp +import java.util.* + +/** + * Smoke test for Apache Calcite integration. + */ +class CalciteTest { + /** + * The trace to experiment with. + */ + private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + + @Test + fun testResources() { + runQuery(trace, "SELECT * FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + { assertTrue(rs.next()) }, + { assertEquals("1023", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1052", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1073", rs.getString("id")) }, + { assertFalse(rs.next()) } + ) + } + } + + @Test + fun testResourceStates() { + runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) }, + { assertEquals(300000, rs.getLong("duration")) }, + { assertEquals(0.0, rs.getDouble("cpu_usage")) }, + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + ) + } + } + + @Test + fun testInterferenceGroups() { + runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, + { assertEquals(0.0, rs.getDouble("target")) }, + { assertEquals(0.8830158730158756, rs.getDouble("score")) }, + ) + } + } + + @Test + fun testComplexQuery() { + runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, + { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }, + ) + } + } + + /** + * Helper function to run statement for the specified trace. + */ + private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) + connection.rootSchema.add("trace", TraceSchema(trace)) + + val stmt = connection.createStatement() + val results = stmt.executeQuery(query) + try { + block(results) + } finally { + results.close() + stmt.close() + connection.close() + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt new file mode 100644 index 00000000..0a552e74 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.sql.DriverManager +import java.sql.Timestamp +import java.util.* + +/** + * Test suite for [TraceSchemaFactory]. + */ +class TraceSchemaFactoryTest { + @Test + fun testSmoke() { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info) + val stmt = connection.createStatement() + val rs = stmt.executeQuery("SELECT * FROM trace.resources") + try { + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + ) + } finally { + rs.close() + stmt.close() + connection.close() + } + } + + @Test + fun testWithoutParams() { + assertThrows { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory") + } + } + + @Test + fun testWithoutPath() { + assertThrows { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm") + } + } + + @Test + fun testWithoutFormat() { + assertThrows { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace") + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json new file mode 100644 index 00000000..6a0616d9 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json @@ -0,0 +1,20 @@ +[ + { + "vms": [ + "1019", + "1023", + "1052" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "1023", + "1052", + "1073" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet new file mode 100644 index 00000000..d8184945 Binary files /dev/null and b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet differ diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet new file mode 100644 index 00000000..00ab5835 Binary files /dev/null and b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet differ diff --git a/settings.gradle.kts b/settings.gradle.kts index cc454b19..a779edcc 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -59,6 +59,7 @@ include(":opendc-trace:opendc-trace-bitbrains") include(":opendc-trace:opendc-trace-azure") include(":opendc-trace:opendc-trace-opendc") include(":opendc-trace:opendc-trace-parquet") +include(":opendc-trace:opendc-trace-calcite") include(":opendc-trace:opendc-trace-tools") include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") -- cgit v1.2.3 From 157fc322e6e68eb77177d13844e9793aeb6e8850 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 29 Apr 2022 22:55:07 +0200 Subject: feat(trace/calcite): Add support for writing via SQL This change updates the Apache Calcite integration to support writing workload traces via SQL. This enables custom conversion scripts between different workload traces. --- .../org/opendc/trace/calcite/InsertableTable.kt | 39 ++++++ .../kotlin/org/opendc/trace/calcite/TraceSchema.kt | 4 +- .../kotlin/org/opendc/trace/calcite/TraceTable.kt | 107 ++++++++++++++-- .../org/opendc/trace/calcite/TraceTableModify.kt | 138 +++++++++++++++++++++ .../opendc/trace/calcite/TraceTableModifyRule.kt | 65 ++++++++++ .../kotlin/org/opendc/trace/calcite/CalciteTest.kt | 43 ++++++- .../src/test/resources/model.json | 15 +++ 7 files changed, 399 insertions(+), 12 deletions(-) create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt create mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/model.json diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt new file mode 100644 index 00000000..9c7b69a2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.schema.Table + +/** + * A Calcite [Table] to which rows can be inserted. + */ +internal interface InsertableTable : Table { + /** + * Insert [rows] into this table. + * + * @param rows The rows to insert into the table. + * @return The number of rows inserted. + */ + fun insert(rows: Enumerable>): Long +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt index 298a59dc..3249546d 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -33,7 +33,9 @@ import org.opendc.trace.Trace * @param trace The [Trace] to create a schema for. */ public class TraceSchema(private val trace: Trace) : AbstractSchema() { - + /** + * The [Table]s that belong to this schema. + */ private val tables: Map by lazy { trace.tables.associateWith { val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 8c3fe4e2..af521297 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -23,15 +23,23 @@ package org.opendc.trace.calcite import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.AbstractQueryableTable import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.linq4j.AbstractEnumerable -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.linq4j.Enumerator +import org.apache.calcite.linq4j.* +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.prepare.Prepare.CatalogReader +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify import org.apache.calcite.rel.type.RelDataType import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.ModifiableTable import org.apache.calcite.schema.ScannableTable +import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.Table -import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName import java.time.Duration import java.time.Instant @@ -40,8 +48,11 @@ import java.util.concurrent.atomic.AtomicBoolean /** * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. */ -internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable { - +internal class TraceTable(private val table: org.opendc.trace.Table) : + AbstractQueryableTable(Array::class.java), + ScannableTable, + ModifiableTable, + InsertableTable { private var rowType: RelDataType? = null override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { @@ -57,8 +68,88 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT override fun scan(root: DataContext): Enumerable> { val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) return object : AbstractEnumerable>() { - override fun enumerator(): Enumerator> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + override fun enumerator(): Enumerator> = + TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + } + } + + override fun insert(rows: Enumerable>): Long { + val table = table + val columns = table.columns + val writer = table.newWriter() + val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + var rowCount = 0L + + try { + for (row in rows) { + writer.startRow() + + for ((index, value) in row.withIndex()) { + if (value == null) { + continue + } + val columnType = columns[index].type + + writer.set( + columnIndices[index], + when (columnType) { + Duration::class.java -> Duration.ofMillis(value as Long) + Instant::class.java -> Instant.ofEpochMilli(value as Long) + Set::class.java -> (value as List<*>).toSet() + else -> value + } + ) + } + + writer.endRow() + + rowCount++ + } + } finally { + writer.close() } + + return rowCount + } + + override fun asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable { + return object : AbstractTableQueryable(queryProvider, schema, this@TraceTable, tableName) { + override fun enumerator(): Enumerator { + val cancelFlag = AtomicBoolean(false) + return TraceReaderEnumerator( + this@TraceTable.table.newReader(), + this@TraceTable.table.columns, + cancelFlag + ) + } + + override fun toString(): String = "TraceTableQueryable[table=$tableName]" + } + } + + override fun getModifiableCollection(): MutableCollection? = null + + override fun toModificationRel( + cluster: RelOptCluster, + table: RelOptTable, + catalogReader: CatalogReader, + child: RelNode, + operation: TableModify.Operation, + updateColumnList: MutableList?, + sourceExpressionList: MutableList?, + flattened: Boolean + ): TableModify { + cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) + + return LogicalTableModify.create( + table, + catalogReader, + child, + operation, + updateColumnList, + sourceExpressionList, + flattened + ) } override fun toString(): String = "TraceTable" @@ -73,7 +164,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT when (column.type) { Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) - Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) else -> typeFactory.createType(column.type) } ) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt new file mode 100644 index 00000000..64dc0cea --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.* +import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.tree.BlockBuilder +import org.apache.calcite.linq4j.tree.Expressions +import org.apache.calcite.linq4j.tree.Types +import org.apache.calcite.plan.* +import org.apache.calcite.prepare.Prepare +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.ModifiableTable +import org.apache.calcite.util.BuiltInMethod +import java.lang.reflect.Method + +/** + * A [TableModify] expression that modifies a workload trace. + */ +internal class TraceTableModify( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + schema: Prepare.CatalogReader, + input: RelNode, + operation: Operation, + updateColumnList: List?, + sourceExpressionList: List?, + flattened: Boolean +) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), + EnumerableRel { + init { + // Make sure the table is modifiable + table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator + } + + override fun copy(traitSet: RelTraitSet, inputs: List?): RelNode { + return TraceTableModify( + cluster, + traitSet, + table, + getCatalogReader(), + sole(inputs), + operation, + updateColumnList, + sourceExpressionList, + isFlattened + ) + } + + override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost { + // Prefer this plan compared to the standard EnumerableTableModify. + return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) + } + + override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result { + val builder = BlockBuilder() + val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) + val childExp = builder.append("child", result.block) + val convertedChildExpr = if (getInput().rowType != rowType) { + val typeFactory = cluster.typeFactory as JavaTypeFactory + val format = EnumerableTableScan.deduceFormat(table) + val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) + val childPhysType = result.physType + val o = Expressions.parameter(childPhysType.javaRowType, "o") + val expressionList = List(childPhysType.rowType.fieldCount) { i -> + childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) + } + + builder.append( + "convertedChild", + Expressions.call( + childExp, + BuiltInMethod.SELECT.method, + Expressions.lambda>(physType.record(expressionList), o) + ) + ) + } else { + childExp + } + + if (!isInsert) { + throw UnsupportedOperationException("Deletion and update not supported") + } + + val expression = table.getExpression(InsertableTable::class.java) + builder.add( + Expressions.return_( + null, + Expressions.call( + BuiltInMethod.SINGLETON_ENUMERABLE.method, + Expressions.call( + Long::class.java, + expression, + INSERT_METHOD, + convertedChildExpr, + ) + ) + ) + ) + + val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR + val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat) + return implementor.result(physType, builder.toBlock()) + } + + private companion object { + /** + * Reference to [InsertableTable.insert] method. + */ + val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt new file mode 100644 index 00000000..7572e381 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.EnumerableConvention +import org.apache.calcite.plan.Convention +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify +import org.apache.calcite.schema.ModifiableTable + +/** + * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify]. + */ +internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { + override fun convert(rel: RelNode): RelNode? { + val modify = rel as TableModify + val table = modify.table!! + + // Make sure that the table is modifiable + if (table.unwrap(ModifiableTable::class.java) == null) { + return null + } + + val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE) + return TraceTableModify( + modify.cluster, traitSet, + table, + modify.catalogReader, + convert(modify.input, traitSet), + modify.operation, + modify.updateColumnList, + modify.sourceExpressionList, + modify.isFlattened + ) + } + + companion object { + /** Default configuration. */ + val DEFAULT: Config = Config.INSTANCE + .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule") + .withRuleFactory { config: Config -> TraceTableModifyRule(config) } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt index f0e461e0..d2877d7c 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -26,9 +26,11 @@ import org.apache.calcite.jdbc.CalciteConnection import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.opendc.trace.Trace +import java.nio.file.Files import java.nio.file.Paths import java.sql.DriverManager import java.sql.ResultSet +import java.sql.Statement import java.sql.Timestamp import java.util.* @@ -99,21 +101,56 @@ class CalciteTest { } } + @Test + fun testInsert() { + val tmp = Files.createTempDirectory("opendc") + val newTrace = Trace.create(tmp, "opendc-vm") + + runStatement(newTrace) { stmt -> + val count = stmt.executeUpdate( + """ + INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity) + VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0) + """.trimIndent() + ) + assertEquals(1, count) + } + + runQuery(newTrace, "SELECT * FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1234", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(2926.0, rs.getDouble("cpu_capacity")) }, + { assertEquals(1024.0, rs.getDouble("mem_capacity")) } + ) + } + } + /** * Helper function to run statement for the specified trace. */ private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) { + runStatement(trace) { stmt -> + val rs = stmt.executeQuery(query) + rs.use { block(rs) } + } + } + + /** + * Helper function to run statement for the specified trace. + */ + private fun runStatement(trace: Trace, block: (Statement) -> Unit) { val info = Properties() info.setProperty("lex", "JAVA") val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) connection.rootSchema.add("trace", TraceSchema(trace)) val stmt = connection.createStatement() - val results = stmt.executeQuery(query) try { - block(results) + block(stmt) } finally { - results.close() stmt.close() connection.close() } diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json new file mode 100644 index 00000000..91e2657f --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json @@ -0,0 +1,15 @@ +{ + "version": "1.0", + "defaultSchema": "trace", + "schemas": [ + { + "name": "trace", + "type": "custom", + "factory": "org.opendc.trace.calcite.TraceSchemaFactory", + "operand": { + "path": "trace", + "format": "opendc-vm" + } + } + ] +} -- cgit v1.2.3 From 5c1b52bc771cddafed26da3c26612aeb115a3c0e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 30 Apr 2022 17:41:20 +0200 Subject: feat(trace/tools): Add support for querying traces using SQL This change adds a command line interface for querying workload traces using SQL. We provide a new command for the trace tools that can query a workload trace. --- gradle/libs.versions.toml | 2 + opendc-trace/opendc-trace-tools/build.gradle.kts | 8 +- .../org/opendc/trace/tools/ConvertCommand.kt | 475 ++++++++++++++++++++ .../kotlin/org/opendc/trace/tools/QueryCommand.kt | 159 +++++++ .../org/opendc/trace/tools/TraceConverter.kt | 481 --------------------- .../kotlin/org/opendc/trace/tools/TraceTools.kt | 44 ++ 6 files changed, 687 insertions(+), 482 deletions(-) create mode 100644 opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt create mode 100644 opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt delete mode 100644 opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt create mode 100644 opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 458ac973..a5c0f184 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -9,6 +9,7 @@ gradle-node = "3.2.1" hadoop = "3.3.1" jackson = "2.13.2" jandex-gradle = "0.12.0" +jline = "3.21.0" jmh-gradle = "0.6.6" jakarta-validation = "2.0.2" junit-jupiter = "5.8.2" @@ -104,6 +105,7 @@ restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" } # Calcite (SQL) calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" } +jline = { module = "org.jline:jline", version.ref = "jline" } # Other classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 0c1e179e..e98fb932 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,16 +29,22 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverter") + mainClass.set("org.opendc.trace.tools.TraceTools") } dependencies { implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTrace.opendcTraceCalcite) implementation(libs.kotlin.logging) implementation(libs.clikt) + implementation(libs.jline) runtimeOnly(projects.opendcTrace.opendcTraceOpendc) runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) runtimeOnly(projects.opendcTrace.opendcTraceAzure) + runtimeOnly(projects.opendcTrace.opendcTraceGwf) + runtimeOnly(projects.opendcTrace.opendcTraceSwf) + runtimeOnly(projects.opendcTrace.opendcTraceWfformat) + runtimeOnly(projects.opendcTrace.opendcTraceWtf) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt new file mode 100644 index 00000000..970de0f4 --- /dev/null +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt @@ -0,0 +1,475 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.tools + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.cooccurring +import com.github.ajalt.clikt.parameters.groups.defaultByName +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.* +import mu.KotlinLogging +import org.opendc.trace.* +import org.opendc.trace.conv.* +import java.io.File +import java.time.Duration +import java.time.Instant +import java.util.* +import kotlin.collections.HashMap +import kotlin.math.abs +import kotlin.math.max +import kotlin.math.min + +/** + * A [CliktCommand] that can convert between workload trace formats. + */ +internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert between workload trace formats") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + /** + * The directory where the trace should be stored. + */ + private val output by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val input by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input format of the trace. + */ + private val inputFormat by option("-f", "--input-format", help = "format of input trace") + .required() + + /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + + /** + * The sampling options. + */ + private val samplingOptions by SamplingOptions().cooccurring() + + /** + * The converter strategy to use. + */ + private val converter by option("-c", "--converter", help = "converter strategy to use").groupChoice( + "default" to DefaultTraceConverter(), + "azure" to AzureTraceConverter(), + ).defaultByName("default") + + override fun run() { + val metaParquet = File(output, "meta.parquet") + val traceParquet = File(output, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) + + logger.info { "Building resources table" } + + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() + + val selectedVms = metaWriter.use { converter.convertResources(inputTrace, it, samplingOptions) } + + if (selectedVms.isEmpty()) { + logger.warn { "No VMs selected" } + return + } + + logger.info { "Wrote ${selectedVms.size} rows" } + logger.info { "Building resource states table" } + + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() + + val statesCount = writer.use { converter.convertResourceStates(inputTrace, it, selectedVms) } + logger.info { "Wrote $statesCount rows" } + } + + /** + * Options for sampling the workload trace. + */ + private class SamplingOptions : OptionGroup() { + /** + * The fraction of VMs to sample + */ + val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") + .double() + .restrictTo(0.0001, 1.0) + .required() + + /** + * The seed for sampling the trace. + */ + val seed by option("--sampling-seed", help = "seed for sampling the workload") + .long() + .default(0) + } + + /** + * A trace conversion strategy. + */ + private sealed class TraceConverter(name: String) : OptionGroup(name) { + /** + * Convert the resources table for the trace. + * + * @param trace The trace to convert. + * @param writer The table writer for the target format. + * @param samplingOptions The sampling options to use. + * @return The map of resources that have been selected. + */ + abstract fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map + + /** + * Convert the resource states table for the trace. + * + * @param trace The trace to convert. + * @param writer The table writer for the target format. + * @param selected The set of virtual machines that have been selected. + * @return The number of rows written. + */ + abstract fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int + + /** + * A resource in the resource table. + */ + data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double + ) + } + + /** + * Default implementation of [TraceConverter]. + */ + private class DefaultTraceConverter : TraceConverter("default") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var hasNextRow = reader.nextRow() + val selectedVms = mutableMapOf() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) + val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) + val memUsageCol = reader.resolve(RESOURCE_STATE_MEM_USAGE) + + while (hasNextRow) { + var id: String + var cpuCount = 0 + var cpuCapacity = 0.0 + var memCapacity = 0.0 + var memUsage = 0.0 + var startTime = Long.MAX_VALUE + var stopTime = Long.MIN_VALUE + + do { + id = reader.get(idCol) as String + + val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + startTime = min(startTime, timestamp) + stopTime = max(stopTime, timestamp) + + cpuCount = max(cpuCount, reader.getInt(cpuCountCol)) + cpuCapacity = max(cpuCapacity, reader.getDouble(cpuCapacityCol)) + memCapacity = max(memCapacity, reader.getDouble(memCapacityCol)) + if (memUsageCol > 0) { + memUsage = max(memUsage, reader.getDouble(memUsageCol)) + } + + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_ID)) + + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue + } + + logger.info { "Selecting VM $id" } + + val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval + val stopInstant = Instant.ofEpochMilli(stopTime) + + selectedVms.computeIfAbsent(id) { + Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, max(memCapacity, memUsage)) + } + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, startInstant) + writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() + } + + return selectedVms + } + + override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val sampleInterval = SAMPLE_INTERVAL.toMillis() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + var hasNextRow = reader.nextRow() + var count = 0 + + while (hasNextRow) { + val id = reader.get(idCol) as String + val resource = selected[id] + if (resource == null) { + hasNextRow = reader.nextRow() + continue + } + + val cpuCount = reader.getInt(cpuCountCol) + val cpuUsage = reader.getDouble(cpuUsageCol) + + val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + var timestamp: Long = startTimestamp + var duration: Long = sampleInterval + + // Attempt to cascade further samples into one if they share the same CPU usage + while (reader.nextRow().also { hasNextRow = it }) { + val shouldCascade = id == reader.get(idCol) && + abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && + cpuCount == reader.getInt(cpuCountCol) + + // Check whether the next sample can be cascaded with the current sample: + // (1) The VM identifier of both samples matches + // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF` + // (3) The CPU count of both samples is identical + if (!shouldCascade) { + break + } + + val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + + // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` + if ((nextTimestamp - timestamp) > sampleInterval) { + break + } + + duration += nextTimestamp - timestamp + timestamp = nextTimestamp + } + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() + + count++ + } + + return count + } + } + + /** + * Implementation of [TraceConverter] for the Azure trace format. + */ + private class AzureTraceConverter : TraceConverter("default") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + /** + * CPU capacity of the machines used by Azure. + */ + private val CPU_CAPACITY = 2500.0 + + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + val selectedVms = mutableMapOf() + + while (reader.nextRow()) { + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue + } + + val id = reader.get(idCol) as String + val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli() + val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli() + val cpuCount = reader.getInt(cpuCountCol) + val memCapacity = reader.getDouble(memCapacityCol) + + logger.info { "Selecting VM $id" } + + val startInstant = Instant.ofEpochMilli(startTime) + val stopInstant = Instant.ofEpochMilli(stopTime) + val cpuCapacity = cpuCount * CPU_CAPACITY + + selectedVms.computeIfAbsent(id) { + Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, memCapacity) + } + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, startInstant) + writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) + writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) + writer.endRow() + } + + return selectedVms + } + + override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val states = HashMap() + val sampleInterval = SAMPLE_INTERVAL.toMillis() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE_PCT) + + var count = 0 + + while (reader.nextRow()) { + val id = reader.get(idCol) as String + val resource = selected[id] ?: continue + + val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz + val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } + val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val delta = (timestamp - state.time) + + // Check whether the next sample can be cascaded with the current sample: + // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`) + // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL` + if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) { + state.time = timestamp + state.duration += delta + continue + } + + state.write(writer) + // Reset the state fields + state.time = timestamp + state.duration = sampleInterval + // Count write + count++ + } + + for ((_, state) in states) { + state.write(writer) + count++ + } + + return count + } + + private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) { + @JvmField var time: Long = resource.startTime.toEpochMilli() + private var lastWrite: Long = Long.MIN_VALUE + + fun write(writer: TableWriter) { + // Check whether this timestamp was already written + if (lastWrite == time) { + return + } + lastWrite = time + + writer.startRow() + writer.set(RESOURCE_ID, resource.id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) + writer.endRow() + } + } + } +} diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt new file mode 100644 index 00000000..b0f95de2 --- /dev/null +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.tools + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.file +import org.apache.calcite.jdbc.CalciteConnection +import org.jline.builtins.Styles +import org.jline.console.Printer +import org.jline.console.impl.DefaultPrinter +import org.jline.terminal.Terminal +import org.jline.terminal.TerminalBuilder +import org.jline.utils.AttributedStringBuilder +import org.opendc.trace.Trace +import org.opendc.trace.calcite.TraceSchema +import java.nio.charset.StandardCharsets +import java.sql.DriverManager +import java.sql.ResultSet +import java.sql.ResultSetMetaData +import java.util.* + +/** + * A [CliktCommand] that allows users to query workload traces using SQL. + */ +internal class QueryCommand : CliktCommand(name = "query", help = "Query workload traces") { + /** + * The trace to open. + */ + private val input by option("-i", "--input") + .file(mustExist = true) + .required() + + /** + * The input format of the trace. + */ + private val inputFormat by option("-f", "--format", help = "format of the trace") + .required() + + /** + * The query to execute. + */ + private val query by argument() + + /** + * Access to the terminal. + */ + private val terminal = TerminalBuilder.builder() + .system(false) + .streams(System.`in`, System.out) + .encoding(StandardCharsets.UTF_8) + .build() + + /** + * Helper class to print results to console. + */ + private val printer = QueryPrinter(terminal) + + override fun run() { + val inputTrace = Trace.open(input, format = inputFormat) + val info = Properties().apply { this["lex"] = "JAVA" } + val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) + connection.rootSchema.add("trace", TraceSchema(inputTrace)) + connection.schema = "trace" + + val stmt = connection.createStatement() + stmt.executeQuery(query) + + val start = System.currentTimeMillis() + val hasResults = stmt.execute(query) + + try { + if (hasResults) { + do { + stmt.resultSet.use { rs -> + val count: Int = printResults(rs) + val duration = (System.currentTimeMillis() - start) / 1000.0 + printer.println("$count rows selected (${"%.3f".format(duration)} seconds)") + } + } while (stmt.moreResults) + } else { + val count: Int = stmt.updateCount + val duration = (System.currentTimeMillis() - start) / 1000.0 + + printer.println("$count rows affected (${"%0.3f".format(duration)} seconds)") + } + } finally { + stmt.close() + connection.close() + } + } + + /** + * Helper function to print the results to console. + */ + private fun printResults(rs: ResultSet): Int { + var count = 0 + val meta: ResultSetMetaData = rs.metaData + + val options = mapOf( + Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) }, + Printer.BORDER to "|", + ) + val data = mutableListOf>() + + while (rs.next()) { + val row = mutableMapOf() + for (i in 1..meta.columnCount) { + row[meta.getColumnName(i)] = rs.getObject(i) + } + data.add(row) + + count++ + } + + printer.println(options, data) + + return count + } + + /** + * Helper class to print the results of the query. + */ + private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) { + override fun terminal(): Terminal = terminal + + override fun highlightAndPrint(options: MutableMap, exception: Throwable) { + if (options.getOrDefault("exception", "stack") == "stack") { + exception.printStackTrace() + } else { + val asb = AttributedStringBuilder() + asb.append(exception.message, Styles.prntStyle().resolve(".em")) + asb.toAttributedString().println(terminal()) + } + } + } +} diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt deleted file mode 100644 index c71035d4..00000000 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("TraceConverter") -package org.opendc.trace.tools - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.cooccurring -import com.github.ajalt.clikt.parameters.groups.defaultByName -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.* -import mu.KotlinLogging -import org.opendc.trace.* -import org.opendc.trace.conv.* -import java.io.File -import java.time.Duration -import java.time.Instant -import java.util.* -import kotlin.collections.HashMap -import kotlin.math.abs -import kotlin.math.max -import kotlin.math.min - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -fun main(args: Array): Unit = TraceConverterCli().main(args) - -/** - * Represents the command for converting traces - */ -internal class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The logger instance for the converter. - */ - private val logger = KotlinLogging.logger {} - - /** - * The directory where the trace should be stored. - */ - private val output by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val input by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input format of the trace. - */ - private val inputFormat by option("-f", "--input-format", help = "format of output trace") - .required() - - /** - * The format of the output trace. - */ - private val outputFormat by option("--output-format", help = "format of output trace") - .default("opendc-vm") - - /** - * The sampling options. - */ - private val samplingOptions by SamplingOptions().cooccurring() - - /** - * The converter strategy to use. - */ - private val converter by option("-c", "--converter", help = "converter strategy to use").groupChoice( - "default" to DefaultTraceConverter(), - "azure" to AzureTraceConverter(), - ).defaultByName("default") - - override fun run() { - val metaParquet = File(output, "meta.parquet") - val traceParquet = File(output, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val inputTrace = Trace.open(input, format = inputFormat) - val outputTrace = Trace.create(output, format = outputFormat) - - logger.info { "Building resources table" } - - val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - - val selectedVms = metaWriter.use { converter.convertResources(inputTrace, it, samplingOptions) } - - if (selectedVms.isEmpty()) { - logger.warn { "No VMs selected" } - return - } - - logger.info { "Wrote ${selectedVms.size} rows" } - logger.info { "Building resource states table" } - - val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - - val statesCount = writer.use { converter.convertResourceStates(inputTrace, it, selectedVms) } - logger.info { "Wrote $statesCount rows" } - } - - /** - * Options for sampling the workload trace. - */ - private class SamplingOptions : OptionGroup() { - /** - * The fraction of VMs to sample - */ - val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") - .double() - .restrictTo(0.0001, 1.0) - .required() - - /** - * The seed for sampling the trace. - */ - val seed by option("--sampling-seed", help = "seed for sampling the workload") - .long() - .default(0) - } - - /** - * A trace conversion strategy. - */ - private sealed class TraceConverter(name: String) : OptionGroup(name) { - /** - * Convert the resources table for the trace. - * - * @param trace The trace to convert. - * @param writer The table writer for the target format. - * @param samplingOptions The sampling options to use. - * @return The map of resources that have been selected. - */ - abstract fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map - - /** - * Convert the resource states table for the trace. - * - * @param trace The trace to convert. - * @param writer The table writer for the target format. - * @param selected The set of virtual machines that have been selected. - * @return The number of rows written. - */ - abstract fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int - - /** - * A resource in the resource table. - */ - data class Resource( - val id: String, - val startTime: Instant, - val stopTime: Instant, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Double - ) - } - - /** - * Default implementation of [TraceConverter]. - */ - private class DefaultTraceConverter : TraceConverter("default") { - /** - * The logger instance for the converter. - */ - private val logger = KotlinLogging.logger {} - - /** - * The interval at which the samples where taken. - */ - private val SAMPLE_INTERVAL = Duration.ofMinutes(5) - - /** - * The difference in CPU usage for the algorithm to cascade samples. - */ - private val SAMPLE_CASCADE_DIFF = 0.1 - - override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { - val random = samplingOptions?.let { Random(it.seed) } - val samplingFraction = samplingOptions?.fraction ?: 1.0 - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - val selectedVms = mutableMapOf() - - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) - val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) - val memUsageCol = reader.resolve(RESOURCE_STATE_MEM_USAGE) - - while (hasNextRow) { - var id: String - var cpuCount = 0 - var cpuCapacity = 0.0 - var memCapacity = 0.0 - var memUsage = 0.0 - var startTime = Long.MAX_VALUE - var stopTime = Long.MIN_VALUE - - do { - id = reader.get(idCol) as String - - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - startTime = min(startTime, timestamp) - stopTime = max(stopTime, timestamp) - - cpuCount = max(cpuCount, reader.getInt(cpuCountCol)) - cpuCapacity = max(cpuCapacity, reader.getDouble(cpuCapacityCol)) - memCapacity = max(memCapacity, reader.getDouble(memCapacityCol)) - if (memUsageCol > 0) { - memUsage = max(memUsage, reader.getDouble(memUsageCol)) - } - - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_ID)) - - // Sample only a fraction of the VMs - if (random != null && random.nextDouble() > samplingFraction) { - continue - } - - logger.info { "Selecting VM $id" } - - val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval - val stopInstant = Instant.ofEpochMilli(stopTime) - - selectedVms.computeIfAbsent(id) { - Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, max(memCapacity, memUsage)) - } - - writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) - writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) - writer.endRow() - } - - return selectedVms - } - - override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - val sampleInterval = SAMPLE_INTERVAL.toMillis() - - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) - - var hasNextRow = reader.nextRow() - var count = 0 - - while (hasNextRow) { - val id = reader.get(idCol) as String - val resource = selected[id] - if (resource == null) { - hasNextRow = reader.nextRow() - continue - } - - val cpuCount = reader.getInt(cpuCountCol) - val cpuUsage = reader.getDouble(cpuUsageCol) - - val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - var timestamp: Long = startTimestamp - var duration: Long = sampleInterval - - // Attempt to cascade further samples into one if they share the same CPU usage - while (reader.nextRow().also { hasNextRow = it }) { - val shouldCascade = id == reader.get(idCol) && - abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && - cpuCount == reader.getInt(cpuCountCol) - - // Check whether the next sample can be cascaded with the current sample: - // (1) The VM identifier of both samples matches - // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF` - // (3) The CPU count of both samples is identical - if (!shouldCascade) { - break - } - - val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - - // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` - if ((nextTimestamp - timestamp) > sampleInterval) { - break - } - - duration += nextTimestamp - timestamp - timestamp = nextTimestamp - } - - writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) - writer.endRow() - - count++ - } - - return count - } - } - - /** - * Implementation of [TraceConverter] for the Azure trace format. - */ - private class AzureTraceConverter : TraceConverter("default") { - /** - * The logger instance for the converter. - */ - private val logger = KotlinLogging.logger {} - - /** - * CPU capacity of the machines used by Azure. - */ - private val CPU_CAPACITY = 2500.0 - - /** - * The interval at which the samples where taken. - */ - private val SAMPLE_INTERVAL = Duration.ofMinutes(5) - - /** - * The difference in CPU usage for the algorithm to cascade samples. - */ - private val SAMPLE_CASCADE_DIFF = 0.1 - - override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { - val random = samplingOptions?.let { Random(it.seed) } - val samplingFraction = samplingOptions?.fraction ?: 1.0 - val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - - val idCol = reader.resolve(RESOURCE_ID) - val startTimeCol = reader.resolve(RESOURCE_START_TIME) - val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) - - val selectedVms = mutableMapOf() - - while (reader.nextRow()) { - // Sample only a fraction of the VMs - if (random != null && random.nextDouble() > samplingFraction) { - continue - } - - val id = reader.get(idCol) as String - val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli() - val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli() - val cpuCount = reader.getInt(cpuCountCol) - val memCapacity = reader.getDouble(memCapacityCol) - - logger.info { "Selecting VM $id" } - - val startInstant = Instant.ofEpochMilli(startTime) - val stopInstant = Instant.ofEpochMilli(stopTime) - val cpuCapacity = cpuCount * CPU_CAPACITY - - selectedVms.computeIfAbsent(id) { - Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, memCapacity) - } - - writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) - writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) - writer.endRow() - } - - return selectedVms - } - - override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - val states = HashMap() - val sampleInterval = SAMPLE_INTERVAL.toMillis() - - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE_PCT) - - var count = 0 - - while (reader.nextRow()) { - val id = reader.get(idCol) as String - val resource = selected[id] ?: continue - - val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz - val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - val delta = (timestamp - state.time) - - // Check whether the next sample can be cascaded with the current sample: - // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`) - // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL` - if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) { - state.time = timestamp - state.duration += delta - continue - } - - state.write(writer) - // Reset the state fields - state.time = timestamp - state.duration = sampleInterval - // Count write - count++ - } - - for ((_, state) in states) { - state.write(writer) - count++ - } - - return count - } - - private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) { - @JvmField var time: Long = resource.startTime.toEpochMilli() - private var lastWrite: Long = Long.MIN_VALUE - - fun write(writer: TableWriter) { - // Check whether this timestamp was already written - if (lastWrite == time) { - return - } - lastWrite = time - - writer.startRow() - writer.set(RESOURCE_ID, resource.id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) - writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) - writer.endRow() - } - } - } -} diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt new file mode 100644 index 00000000..b480484b --- /dev/null +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceTools") +package org.opendc.trace.tools + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.core.subcommands + +/** + * A script for querying and manipulating workload traces supported by OpenDC. + */ +fun main(args: Array): Unit = TraceToolsCli().main(args) + +/** + * The primary [CliktCommand] for the trace tools offered by OpenDC. + */ +class TraceToolsCli : CliktCommand(name = "trace-tools") { + init { + subcommands(QueryCommand()) + subcommands(ConvertCommand()) + } + + override fun run() {} +} -- cgit v1.2.3 From ee057033b4c534fdd3e8a9d2320d75035d30f27a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 21:16:43 +0200 Subject: refactor(trace/parquet): Support custom ReadSupport implementations This change updates the `LocalParquetReader` implementation to support custom `ReadSupport` implementations, so we do not have to rely on the Avro implementation necessarily. --- .../export/parquet/ParquetHostDataWriter.kt | 6 +-- .../export/parquet/ParquetServerDataWriter.kt | 6 +-- .../export/parquet/ParquetServiceDataWriter.kt | 2 +- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 2 +- opendc-trace/opendc-trace-parquet/build.gradle.kts | 2 +- .../kotlin/org/opendc/trace/util/avro/AvroUtils.kt | 44 ++++++++++++++++++ .../org/opendc/trace/util/parquet/AvroUtils.kt | 44 ------------------ .../trace/util/parquet/LocalParquetReader.kt | 53 ++++++++++++++++------ 8 files changed, 92 insertions(+), 67 deletions(-) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt delete mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 2b7cac8f..72dbba90 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.UUID_SCHEMA +import org.opendc.trace.util.avro.optional import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 144b6624..aac6115f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.UUID_SCHEMA +import org.opendc.trace.util.avro.optional import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index ec8a2b65..2db30bc4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -26,7 +26,7 @@ import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import java.io.File /** diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 36a1b4a0..1a15c7b3 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,7 +34,7 @@ import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 302c0b14..e6415586 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(libs.parquet) { exclude(group = "org.apache.hadoop") } - runtimeOnly(libs.hadoop.common) { + api(libs.hadoop.common) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt new file mode 100644 index 00000000..a655d39f --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.trace.util.avro + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +public fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt deleted file mode 100644 index 086b900b..00000000 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..bb2bb10d 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +33,15 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader(path: Path) : AutoCloseable { +public class LocalParquetReader(path: Path, + private val factory: (InputFile) -> ParquetReader = avro()) : AutoCloseable { /** * The input files to process. */ @@ -93,20 +98,40 @@ public class LocalParquetReader(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + factory(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader { - return AvroParquetReader - .builder(input) - .disableCompatibility() - .build() + public companion object { + /** + * A factory for reading Avro Parquet files. + */ + public fun avro(): (InputFile) -> ParquetReader { + return { input -> + AvroParquetReader + .builder(input) + .disableCompatibility() + .build() + } + } + + /** + * A factory for reading Parquet files with custom [ReadSupport]. + */ + public fun custom(readSupport: ReadSupport): (InputFile) -> ParquetReader { + return { input -> + object : ParquetReader.Builder(input) { + override fun getReadSupport(): ReadSupport = readSupport + }.build() + } + } } } -- cgit v1.2.3 From ea5e79fc77072e6151ee7952581b97e35a2027fb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 22:54:08 +0200 Subject: perf(trace/opendc): Read records using low-level API This change updates the OpenDC VM format reader implementation to use the low-level record reading APIs provided by the `parquet-mr` library for improved performance. Previously, we used the `parquet-avro` library to read/write Avro records in Parquet format, but that library carries considerable overhead. --- .../trace/opendc/OdcVmResourceStateTableReader.kt | 74 ++++-------- .../trace/opendc/OdcVmResourceStateTableWriter.kt | 124 +++++++++++---------- .../trace/opendc/OdcVmResourceTableReader.kt | 65 +++-------- .../trace/opendc/OdcVmResourceTableWriter.kt | 117 ++++++++++--------- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 65 +++-------- .../org/opendc/trace/opendc/parquet/Resource.kt | 37 ++++++ .../trace/opendc/parquet/ResourceReadSupport.kt | 111 ++++++++++++++++++ .../opendc/parquet/ResourceRecordMaterializer.kt | 107 ++++++++++++++++++ .../opendc/trace/opendc/parquet/ResourceState.kt | 34 ++++++ .../opendc/parquet/ResourceStateReadSupport.kt | 105 +++++++++++++++++ .../parquet/ResourceStateRecordMaterializer.kt | 102 +++++++++++++++++ .../opendc/parquet/ResourceStateWriteSupport.kt | 105 +++++++++++++++++ .../trace/opendc/parquet/ResourceWriteSupport.kt | 114 +++++++++++++++++++ .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 60 ++++++++++ .../trace/util/parquet/LocalParquetReader.kt | 6 +- .../trace/util/parquet/LocalParquetWriter.kt | 55 +++++++++ 16 files changed, 1022 insertions(+), 259 deletions(-) create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt create mode 100644 opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index b82da888..7a01b881 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant /** * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { +internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: ResourceState? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record[AVRO_COL_ID].toString() - COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) - COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) - COL_CPU_COUNT -> getInt(index) - COL_CPU_USAGE -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") + COL_ID -> record.id + COL_TIMESTAMP -> record.timestamp + COL_DURATION -> record.duration + COL_CPU_COUNT -> record.cpuCount + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column index $index") } } override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_COUNT -> record.cpuCount + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun toString(): String = "OdcVmResourceStateTableReader" - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - AVRO_COL_DURATION = schema.getField("duration").pos() - AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema", e) - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_TIMESTAMP = -1 - private var AVRO_COL_DURATION = -1 - private var AVRO_COL_CPU_COUNT = -1 - private var AVRO_COL_CPU_USAGE = -1 - private val COL_ID = 0 private val COL_TIMESTAMP = 1 private val COL_DURATION = 2 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 01b9750c..97af5b59 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -22,84 +22,85 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableWriter( - private val writer: ParquetWriter, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource state schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _timestamp: Instant = Instant.MIN + private var _duration: Duration = Duration.ZERO + private var _cpuCount: Int = 0 + private var _cpuUsage: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - - val record = builder.build() - val id = record[COL_ID] as String - val timestamp = record[COL_TIMESTAMP] as Long + check(_isActive) { "No active row" } + _isActive = false - check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - writer.write(builder.build()) + writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)) - lastId = id - lastTimestamp = timestamp + lastId = _id + lastTimestamp = _timestamp } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() - RESOURCE_STATE_DURATION -> schema.getField("duration").pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - - builder.set( - fields[index], - when (index) { - COL_TIMESTAMP -> (value as Instant).toEpochMilli() - COL_DURATION -> (value as Duration).toMillis() - else -> value - } - ) + check(_isActive) { "No active row" } + + when (index) { + COL_ID -> _id = value as String + COL_TIMESTAMP -> _timestamp = value as Instant + COL_DURATION -> _duration = value as Duration + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_USAGE -> _cpuUsage = value as Double + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_USAGE -> _cpuUsage = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter( * Last column values that are used to check for correct partitioning. */ private var lastId: String? = null - private var lastTimestamp: Long = Long.MIN_VALUE - - /** - * Columns with special behavior. - */ - private val COL_ID = resolve(RESOURCE_ID) - private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) - private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) + private var lastTimestamp: Instant = Instant.MAX + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 4909e70e..6102332f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,37 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Instant /** - * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format. + * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader) : TableReader { +internal class OdcVmResourceTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Resource? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record[AVRO_COL_ID].toString() - COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) - COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_ID -> record.id + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime COL_CPU_COUNT -> getInt(index) COL_CPU_CAPACITY -> getDouble(index) COL_MEM_CAPACITY -> getDouble(index) @@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record[AVRO_COL_CPU_COUNT] as Int + COL_CPU_COUNT -> record.cpuCount else -> throw IllegalArgumentException("Invalid column") } } @@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0 - COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() + COL_CPU_CAPACITY -> record.cpuCapacity + COL_MEM_CAPACITY -> record.memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceTableWriter(private val writer: ParquetWriter) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _startTime: Instant = Instant.MIN + private var _stopTime: Instant = Instant.MIN + private var _cpuCount: Int = 0 + private var _cpuCapacity: Double = Double.NaN + private var _memCapacity: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = Double.NaN + _memCapacity = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - writer.write(builder.build()) + check(_isActive) { "No active row" } + _isActive = false + writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos() - RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - builder.set( - fields[index], - when (index) { - COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() - COL_MEM_CAPACITY -> (value as Double).roundToLong() - else -> value - } - ) + check(_isActive) { "No active row" } + when (index) { + COL_ID -> _id = value as String + COL_START_TIME -> _startTime = value as Instant + COL_STOP_TIME -> _stopTime = value as Instant + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_CAPACITY -> _cpuCapacity = value as Double + COL_MEM_CAPACITY -> _memCapacity = value as Double + else -> throw IllegalArgumentException("Invalid column index $index") + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_CAPACITY -> _cpuCapacity = value + COL_MEM_CAPACITY -> _memCapacity = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter( writer.close() } - /** - * Columns with special behavior. - */ - private val COL_START_TIME = resolve(RESOURCE_START_TIME) - private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) - private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 1a15c7b3..155f8cf3 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -22,19 +22,19 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecord -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport +import org.opendc.trace.opendc.parquet.ResourceWriteSupport import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.LocalParquetWriter import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files @@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet")) + val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport())) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet")) + val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport())) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { @@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat { override fun newWriter(path: Path, table: String): TableWriter { return when (table) { TABLE_RESOURCES -> { - val schema = RESOURCES_SCHEMA - val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("meta.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceTableWriter(writer, schema) + OdcVmResourceTableWriter(writer) } TABLE_RESOURCE_STATES -> { - val schema = RESOURCE_STATES_SCHEMA - val writer = AvroParquetWriter.builder(LocalOutputFile(path.resolve("trace.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceStateTableWriter(writer, schema) + OdcVmResourceStateTableWriter(writer) } TABLE_INTERFERENCE_GROUPS -> { val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) @@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } - - public companion object { - /** - * Schema for the resources table in the trace. - */ - @JvmStatic - public val RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("resource") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() - .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_capacity") - .requiredLong("mem_capacity") - .endRecord() - - /** - * Schema for the resource states table in the trace. - */ - @JvmStatic - public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("resource_state") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredLong("duration") - .requiredInt("cpu_count") - .requiredDouble("cpu_usage") - .endRecord() - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt new file mode 100644 index 00000000..c6db45b5 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt new file mode 100644 index 00000000..47cce914 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [Resource] objects. + */ +internal class ResourceReadSupport : ReadSupport() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = ResourceRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") + + /** + * Parquet read schema (version 2.1) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + + /** + * Parquet read schema for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt new file mode 100644 index 00000000..3adb0709 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Resource] records. + */ +internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var _id = "" + private var _startTime = Instant.MIN + private var _stopTime = Instant.MIN + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _memCapacity = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _startTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _stopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_capacity" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _memCapacity = value + } + + override fun addLong(value: Long) { + _memCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = 0.0 + _memCapacity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt new file mode 100644 index 00000000..9ad58764 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Duration +import java.time.Instant + +internal class ResourceState( + val id: String, + val timestamp: Instant, + val duration: Duration, + val cpuCount: Int, + val cpuUsage: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt new file mode 100644 index 00000000..17840ceb --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [ResourceState] objects. + */ +internal class ResourceStateReadSupport : ReadSupport() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = ResourceStateRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage") + ) + .named("resource_state") + + /** + * Parquet read schema (version 2.1) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + + /** + * Parquet read schema for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt new file mode 100644 index 00000000..f8b0c3c2 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant + +/** + * A [RecordMaterializer] for [ResourceState] records. + */ +internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var _id = "" + private var _timestamp = Instant.MIN + private var _duration = Duration.ZERO + private var _cpuCount = 0 + private var _cpuUsage = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _timestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _duration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuUsage = value + } + } + "flops" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt new file mode 100644 index 00000000..e2f3df31 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceStateWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ResourceState) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: ResourceState) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("timestamp", 1) + consumer.addLong(record.timestamp.toEpochMilli()) + consumer.endField("timestamp", 1) + + consumer.startField("duration", 2) + consumer.addLong(record.duration.toMillis()) + consumer.endField("duration", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_usage", 4) + consumer.addDouble(record.cpuUsage) + consumer.endField("cpu_usage", 4) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resource states" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt new file mode 100644 index 00000000..14cadabb --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import kotlin.math.roundToLong + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Resource) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: Resource) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("start_time", 1) + consumer.addLong(record.startTime.toEpochMilli()) + consumer.endField("start_time", 1) + + consumer.startField("stop_time", 2) + consumer.addLong(record.stopTime.toEpochMilli()) + consumer.endField("stop_time", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_capacity", 4) + consumer.addDouble(record.cpuCapacity) + consumer.endField("cpu_capacity", 4) + + consumer.startField("mem_capacity", 5) + consumer.addLong(record.memCapacity.roundToLong()) + consumer.endField("mem_capacity", 5) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resources" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index c8742624..dec0fef9 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.conv.* +import java.nio.file.Files import java.nio.file.Paths +import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -78,6 +80,37 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourcesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_START_TIME, Instant.EPOCH) + writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) + writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { @@ -94,6 +127,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourceStatesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCE_STATES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCE_STATES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index bb2bb10d..3e6f19a2 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -40,8 +40,10 @@ import kotlin.io.path.isDirectory * @param path The path to the Parquet file or directory to read. * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader(path: Path, - private val factory: (InputFile) -> ParquetReader = avro()) : AutoCloseable { +public class LocalParquetReader( + path: Path, + private val factory: (InputFile) -> ParquetReader = avro() +) : AutoCloseable { /** * The input files to process. */ diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt new file mode 100644 index 00000000..b5eb1deb --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.OutputFile +import java.nio.file.Path + +/** + * Helper class for writing Parquet records to local disk. + */ +public class LocalParquetWriter { + /** + * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations. + */ + public class Builder internal constructor( + output: OutputFile, + private val writeSupport: WriteSupport + ) : ParquetWriter.Builder>(output) { + override fun self(): Builder = this + + override fun getWriteSupport(conf: Configuration): WriteSupport = writeSupport + } + + public companion object { + /** + * Create a [Builder] instance that writes a Parquet file at the specified [path]. + */ + @JvmStatic + public fun builder(path: Path, writeSupport: WriteSupport): Builder = + Builder(LocalOutputFile(path), writeSupport) + } +} -- cgit v1.2.3 From a1374a63f81fafc5da565072bae2ecae2e0fed28 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 23:33:39 +0200 Subject: refactor(compute): Do not use Avro when exporting experiment data This change updates the `ParquetDataWriter` class to not use the `parquet-avro` library for exporting experiment data, but instead to use the low-level APIs to directly write the data in Parquet format. --- .../opendc-compute-workload/build.gradle.kts | 2 + .../workload/export/parquet/ParquetDataWriter.kt | 32 ++- .../export/parquet/ParquetHostDataWriter.kt | 214 ++++++++++++++++----- .../export/parquet/ParquetServerDataWriter.kt | 196 ++++++++++++++----- .../export/parquet/ParquetServiceDataWriter.kt | 121 +++++++++--- .../compute/workload/export/parquet/Utils.kt | 38 ++++ .../workload/export/parquet/HostDataWriterTest.kt | 79 ++++++++ .../export/parquet/ServerDataWriterTest.kt | 73 +++++++ .../export/parquet/ServiceDataWriterTest.kt | 67 +++++++ 9 files changed, 675 insertions(+), 147 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 9ced95a7..319b2ae3 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -39,4 +39,6 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 84387bbc..c854d874 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -23,14 +23,12 @@ package org.opendc.compute.workload.export.parquet import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalOutputFile +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,10 +36,13 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. */ public abstract class ParquetDataWriter( path: File, - private val schema: Schema, + private val writeSupport: WriteSupport, bufferSize: Int = 4096 ) : AutoCloseable { /** @@ -52,7 +53,7 @@ public abstract class ParquetDataWriter( /** * The queue of records to process. */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -64,15 +65,15 @@ public abstract class ParquetDataWriter( */ private val writerThread = thread(start = false, name = this.toString()) { val writer = let { - val builder = AvroParquetWriter.builder(LocalOutputFile(path)) - .withSchema(schema) + val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withCompressionCodec(CompressionCodecName.ZSTD) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) buildWriter(builder) } val queue = queue - val buf = mutableListOf() + val buf = mutableListOf() var shouldStop = false try { @@ -101,15 +102,10 @@ public abstract class ParquetDataWriter( /** * Build the [ParquetWriter] used to write the Parquet files. */ - protected open fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { return builder.build() } - /** - * Convert the specified [data] into a Parquet record. - */ - protected abstract fun convert(builder: GenericRecordBuilder, data: T) - /** * Write the specified metrics to the database. */ @@ -119,9 +115,7 @@ public abstract class ParquetDataWriter( throw IllegalStateException("Writer thread failed", exception) } - val builder = GenericRecordBuilder(schema) - convert(builder, data) - queue.put(builder.build()) + queue.put(data) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 72dbba90..0d5b6b34 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -22,81 +22,189 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA -import org.opendc.trace.util.avro.UUID_SCHEMA -import org.opendc.trace.util.avro.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { + ParquetDataWriter(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { return builder .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "host-writer" - builder["host_id"] = data.host.id + /** + * A [WriteSupport] implementation for a [HostTableReader]. + */ + private class HostDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.host.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.host.memCapacity + override fun write(record: HostTableReader) { + write(recordConsumer, record) + } - builder["power_total"] = data.powerTotal + private fun write(consumer: RecordConsumer, data: HostTableReader) { + consumer.startMessage() - builder["guests_terminated"] = data.guestsTerminated - builder["guests_running"] = data.guestsRunning - builder["guests_error"] = data.guestsError - builder["guests_invalid"] = data.guestsInvalid - } + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) - override fun toString(): String = "host-writer" + consumer.startField("host_id", 1) + consumer.addBinary(UUID.fromString(data.host.id).toBinary()) + consumer.endField("host_id", 1) + + consumer.startField("uptime", 2) + consumer.addLong(data.uptime) + consumer.endField("uptime", 2) + + consumer.startField("downtime", 3) + consumer.addLong(data.downtime) + consumer.endField("downtime", 3) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 4) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 4) + } + + consumer.startField("cpu_count", 5) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 5) + + consumer.startField("cpu_limit", 6) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 6) + + consumer.startField("cpu_time_active", 7) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 7) + + consumer.startField("cpu_time_idle", 8) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 8) + + consumer.startField("cpu_time_steal", 9) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 9) + + consumer.startField("cpu_time_lost", 10) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 10) + + consumer.startField("mem_limit", 11) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_limit", 11) + + consumer.startField("power_total", 12) + consumer.addDouble(data.powerTotal) + consumer.endField("power_total", 12) + + consumer.startField("guests_terminated", 13) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 13) + + consumer.startField("guests_running", 14) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 14) + + consumer.startField("guests_error", 15) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 15) + + consumer.startField("guests_invalid", 16) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 16) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("host") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .requiredDouble("power_total") - .requiredInt("guests_terminated") - .requiredInt("guests_running") - .requiredInt("guests_error") - .requiredInt("guests_invalid") - .endRecord() + /** + * The schema of the host data. + */ + val SCHEMA: MessageType = Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_total"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + ) + .named("host") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index aac6115f..5d11629b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -22,73 +22,177 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA -import org.opendc.trace.util.avro.UUID_SCHEMA -import org.opendc.trace.util.avro.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { + ParquetDataWriter(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { + override fun buildWriter(builder: LocalParquetWriter.Builder): ParquetWriter { return builder .withDictionaryEncoding("server_id", true) .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "server-writer" - builder["server_id"] = data.server.id - builder["host_id"] = data.host?.id + /** + * A [WriteSupport] implementation for a [ServerTableReader]. + */ + private class ServerDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - builder["boot_time"] = data.bootTime?.toEpochMilli() - builder["provision_time"] = data.provisionTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.server.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.server.memCapacity - } + override fun write(record: ServerTableReader) { + write(recordConsumer, record) + } - override fun toString(): String = "server-writer" + private fun write(consumer: RecordConsumer, data: ServerTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("server_id", 1) + consumer.addBinary(UUID.fromString(data.server.id).toBinary()) + consumer.endField("server_id", 1) + + val hostId = data.host?.id + if (hostId != null) { + consumer.startField("host_id", 2) + consumer.addBinary(UUID.fromString(hostId).toBinary()) + consumer.endField("host_id", 2) + } + + consumer.startField("uptime", 3) + consumer.addLong(data.uptime) + consumer.endField("uptime", 3) + + consumer.startField("downtime", 4) + consumer.addLong(data.downtime) + consumer.endField("downtime", 4) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 5) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 5) + } + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 6) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 6) + } + + consumer.startField("cpu_count", 7) + consumer.addInteger(data.server.cpuCount) + consumer.endField("cpu_count", 7) + + consumer.startField("cpu_limit", 8) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_time_active", 9) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 9) + + consumer.startField("cpu_time_idle", 10) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 10) + + consumer.startField("cpu_time_steal", 11) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 11) + + consumer.startField("cpu_time_lost", 12) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 12) + + consumer.startField("mem_limit", 13) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_limit", 13) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("server") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("server_id").type(UUID_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA.optional()).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .endRecord() + /** + * The schema of the server data. + */ + val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("server_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit") + ) + .named("server") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 2db30bc4..5ad3b95e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,45 +22,108 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecordBuilder +import io.opentelemetry.context.ContextKey.named +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import java.io.File /** * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { - - override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() - builder["hosts_up"] = data.hostsUp - builder["hosts_down"] = data.hostsDown - builder["servers_pending"] = data.serversPending - builder["servers_active"] = data.serversActive - builder["attempts_success"] = data.attemptsSuccess - builder["attempts_failure"] = data.attemptsFailure - builder["attempts_error"] = data.attemptsError - } + ParquetDataWriter(path, ServiceDataWriteSupport(), bufferSize) { override fun toString(): String = "service-writer" + /** + * A [WriteSupport] implementation for a [ServiceTableReader]. + */ + private class ServiceDataWriteSupport : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServiceTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("hosts_up", 1) + consumer.addInteger(data.hostsUp) + consumer.endField("hosts_up", 1) + + consumer.startField("hosts_down", 2) + consumer.addInteger(data.hostsDown) + consumer.endField("hosts_down", 2) + + consumer.startField("servers_pending", 3) + consumer.addInteger(data.serversPending) + consumer.endField("servers_pending", 3) + + consumer.startField("servers_active", 4) + consumer.addInteger(data.serversActive) + consumer.endField("servers_active", 4) + + consumer.startField("attempts_success", 5) + consumer.addInteger(data.attemptsSuccess) + consumer.endField("attempts_pending", 5) + + consumer.startField("attempts_failure", 6) + consumer.addInteger(data.attemptsFailure) + consumer.endField("attempts_failure", 6) + + consumer.startField("attempts_error", 7) + consumer.addInteger(data.attemptsError) + consumer.endField("attempts_error", 7) + + consumer.endMessage() + } + } + private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("service") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("hosts_up") - .requiredInt("hosts_down") - .requiredInt("servers_pending") - .requiredInt("servers_active") - .requiredInt("attempts_success") - .requiredInt("attempts_failure") - .requiredInt("attempts_error") - .endRecord() + private val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error"), + ) + .named("service") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt new file mode 100644 index 00000000..9921f5b8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.parquet.io.api.Binary +import java.nio.ByteBuffer +import java.util.UUID + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal fun UUID.toBinary(): Binary { + val bb = ByteBuffer.wrap(ByteArray(16)) + bb.putLong(mostSignificantBits) + bb.putLong(leastSignificantBits) + return Binary.fromConstantByteBuffer(bb) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt new file mode 100644 index 00000000..dae03513 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.HostTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetHostDataWriter] + */ +class HostDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : HostTableReader { + override val timestamp: Instant = Instant.now() + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val guestsTerminated: Int = 0 + override val guestsRunning: Int = 0 + override val guestsError: Int = 0 + override val guestsInvalid: Int = 0 + override val cpuLimit: Double = 4096.0 + override val cpuUsage: Double = 1.0 + override val cpuDemand: Double = 1.0 + override val cpuUtilization: Double = 0.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val powerUsage: Double = 1.0 + override val powerTotal: Double = 1.0 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt new file mode 100644 index 00000000..280f5ef8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.ServerInfo +import org.opendc.telemetry.compute.table.ServerTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServerDataWriter] + */ +class ServerDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServerTableReader { + override val timestamp: Instant = Instant.now() + override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096) + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val cpuLimit: Double = 4096.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val provisionTime: Instant = timestamp + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt new file mode 100644 index 00000000..7ffa7186 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.ServiceTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServiceDataWriter] + */ +class ServiceDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServiceTableReader { + override val timestamp: Instant = Instant.now() + override val hostsUp: Int = 1 + override val hostsDown: Int = 0 + override val serversPending: Int = 1 + override val serversActive: Int = 1 + override val attemptsSuccess: Int = 1 + override val attemptsFailure: Int = 0 + override val attemptsError: Int = 0 + }) + } + } +} -- cgit v1.2.3 From ae0b12987dca93c05e44341963511ac8cf802793 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 11:06:31 +0200 Subject: refactor(trace/wtf): Do not use Avro when reading WTF trace This change updates the Workflow Trace format implementation in OpenDC to not use the `parquet-avro` library for exporting experiment data, but instead to use the low-level APIs to directly read the data from Parquet. This reduces the amount of conversions necessary before reaching the OpenDC trace API. --- .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 81 +++------- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 5 +- .../kotlin/org/opendc/trace/wtf/parquet/Task.kt | 42 ++++++ .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 99 +++++++++++++ .../trace/wtf/parquet/TaskRecordMaterializer.kt | 165 +++++++++++++++++++++ 5 files changed, 329 insertions(+), 63 deletions(-) create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 1e332aca..f0db78b7 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.wtf -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant +import org.opendc.trace.wtf.parquet.Task /** * A [TableReader] implementation for the WTF format. */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { +internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Task? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader (record[AVRO_COL_ID] as Long).toString() - COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() - COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) - COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) - COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + COL_SUBMIT_TIME -> record.submitTime + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList).map { it["item"].toString() }.toSet() - COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList).map { it["item"].toString() }.toSet() + COL_PARENTS -> record.parents + COL_CHILDREN -> record.children else -> throw IllegalArgumentException("Invalid column") } } @@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader (record[AVRO_COL_REQ_NCPUS] as Double).toInt() - COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int - COL_USER_ID -> record[AVRO_COL_USER_ID] as Int + COL_REQ_NCPUS -> record.requestedCpus + COL_GROUP_ID -> record.groupId + COL_USER_ID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) + val factory = LocalParquetReader.custom(TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt new file mode 100644 index 00000000..71557f96 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import java.time.Duration +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class Task( + val id: String, + val workflowId: String, + val submitTime: Instant, + val waitTime: Duration, + val runtime: Duration, + val requestedCpus: Int, + val groupId: Int, + val userId: Int, + val parents: Set, + val children: Set +) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..0017a4a9 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [Task] objects. + */ +internal class TaskReadSupport : ReadSupport() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..08da5eaf --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant +import kotlin.math.roundToInt + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var _id = "" + private var _workflowId = "" + private var _submitTime = Instant.MIN + private var _waitTime = Duration.ZERO + private var _runtime = Duration.ZERO + private var _requestedCpus = 0 + private var _groupId = 0 + private var _userId = 0 + private var _parents = mutableSetOf() + private var _children = mutableSetOf() + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _id = value.toString() + } + } + "workflow_id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _workflowId = value.toString() + } + } + "ts_submit" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _submitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _waitTime = Duration.ofMillis(value) + } + } + "runtime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _runtime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _requestedCpus = value.roundToInt() + } + } + "group_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _groupId = value + } + } + "user_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _userId = value + } + } + "children" -> RelationConverter(_children) + "parents" -> RelationConverter(_parents) + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _workflowId = "" + _submitTime = Instant.MIN + _waitTime = Duration.ZERO + _runtime = Duration.ZERO + _requestedCpus = 0 + _groupId = 0 + _userId = 0 + _parents.clear() + _children.clear() + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = Task( + _id, + _workflowId, + _submitTime, + _waitTime, + _runtime, + _requestedCpus, + _groupId, + _userId, + _parents.toSet(), + _children.toSet() + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet) : GroupConverter() { + private val entryConverter = object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } + } + + private val listConverter = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return listConverter + } + + override fun start() {} + override fun end() {} + } +} -- cgit v1.2.3 From 9411845b3f26536a1e6ea40504e396f19d25a09a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 11:44:48 +0200 Subject: refactor(trace/parquet): Drop dependency on Avro This change updates the Parquet support library in OpenDC to not rely on Avro, but instead interface directly with Parquet's reading and writing functionality, providing less overhead. --- gradle/libs.versions.toml | 2 +- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 4 +- .../kotlin/org/opendc/trace/util/avro/AvroUtils.kt | 44 -------- .../trace/util/parquet/LocalParquetReader.kt | 39 ++----- .../org/opendc/trace/util/parquet/ParquetTest.kt | 118 +++++++++++++++------ .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 3 +- 6 files changed, 103 insertions(+), 107 deletions(-) delete mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a5c0f184..b05af368 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -70,7 +70,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } -parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" } +parquet = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" } config = { module = "com.typesafe:config", version.ref = "config" } # Quarkus diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 155f8cf3..b455a2cf 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport())) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport())) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt deleted file mode 100644 index a655d39f..00000000 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.trace.util.avro - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index 3e6f19a2..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,7 +22,6 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile @@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. - * @param factory Function to construct a [ParquetReader] for a local [InputFile]. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ public class LocalParquetReader( path: Path, - private val factory: (InputFile) -> ParquetReader = avro() + private val readSupport: ReadSupport ) : AutoCloseable { /** * The input files to process. @@ -64,7 +63,7 @@ public class LocalParquetReader( /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -102,7 +101,7 @@ public class LocalParquetReader( try { this.reader = if (filesIterator.hasNext()) { - factory(filesIterator.next()) + createReader(filesIterator.next()) } else { null } @@ -112,28 +111,12 @@ public class LocalParquetReader( } } - public companion object { - /** - * A factory for reading Avro Parquet files. - */ - public fun avro(): (InputFile) -> ParquetReader { - return { input -> - AvroParquetReader - .builder(input) - .disableCompatibility() - .build() - } - } - - /** - * A factory for reading Parquet files with custom [ReadSupport]. - */ - public fun custom(readSupport: ReadSupport): (InputFile) -> ParquetReader { - return { input -> - object : ParquetReader.Builder(input) { - override fun getReadSupport(): ReadSupport = readSupport - }.build() - } - } + /** + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. + */ + private fun createReader(input: InputFile): ParquetReader { + return object : ParquetReader.Builder(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows { - AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows { - AvroParquetReader.builder(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index aae71c58..d6e42c8c 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -66,8 +66,7 @@ public class WtfTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_TASKS -> { - val factory = LocalParquetReader.custom(TaskReadSupport()) - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") -- cgit v1.2.3 From 670cd279ea7789e07b6d778a21fdec68347ab305 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 14:17:55 +0200 Subject: feat(trace/api): Add support for projecting tables This change adds support for projecting certain columns of a table. This enables faster reading for tables with high number of columns. Currently, we support projection in the Parquet-based workload formats. Other formats are text-based and will probably not benefit much from projection. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 6 ++-- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 4 ++- .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 4 ++- .../org/opendc/trace/azure/AzureTraceFormat.kt | 2 +- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 4 +-- .../trace/bitbrains/BitbrainsExTraceFormat.kt | 2 +- .../opendc/trace/bitbrains/BitbrainsTraceFormat.kt | 2 +- .../trace/bitbrains/BitbrainsExTraceFormatTest.kt | 2 +- .../trace/bitbrains/BitbrainsTraceFormatTest.kt | 4 +-- .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 2 +- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 6 ++-- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 6 ++-- .../trace/opendc/parquet/ResourceReadSupport.kt | 40 ++++++++++++++++++++-- .../opendc/parquet/ResourceStateReadSupport.kt | 38 ++++++++++++++++++-- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 21 ++++++++---- .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 2 +- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 2 +- .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 2 +- .../trace/wfformat/WfFormatTraceFormatTest.kt | 4 +-- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 4 +-- .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 39 +++++++++++++++++++-- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 2 +- 22 files changed, 159 insertions(+), 39 deletions(-) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index b0181cbc..05d0234a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -42,9 +42,11 @@ public interface Table { public val partitionKeys: List> /** - * Open a [TableReader] for this table. + * Open a [TableReader] for a projection of this table. + * + * @param projection The list of columns to fetch from the table or `null` if no projection is performed. */ - public fun newReader(): TableReader + public fun newReader(projection: List>? = null): TableReader /** * Open a [TableWriter] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index 24551edb..b848e19a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override val partitionKeys: List> get() = details.partitionKeys - override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newReader(projection: List>?): TableReader { + return trace.format.newReader(trace.path, name, projection) + } override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index f2e610db..47761e0f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,6 +22,7 @@ package org.opendc.trace.spi +import org.opendc.trace.TableColumn import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import java.nio.file.Path @@ -68,10 +69,11 @@ public interface TraceFormat { * * @param path The path to the trace to open. * @param table The name of the table to open a [TableReader] for. + * @param projection The list of [TableColumn]s to project or `null` if no projection is performed. * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String): TableReader + public fun newReader(path: Path, table: String, projection: List>?): TableReader /** * Open a [TableWriter] for the specified [table]. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 8e3e60cc..73978990 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 56f9a940..263d26ce 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -57,7 +57,7 @@ class AzureTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -71,7 +71,7 @@ class AzureTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 11d21a04..82e454ad 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index e1e7604a..a374e951 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { val vms = Files.walk(path, 1) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 77429e3e..c944cb98 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/vm.txt") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 9309beb1..841801e6 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 63688523..8d9eab82 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 9bf28ad7..411d45d0 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class GwfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -73,7 +73,7 @@ internal class GwfTraceFormatTest { @Test fun testReadingRowWithDependencies() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) // Move to row 7 for (x in 1..6) @@ -85,7 +85,7 @@ internal class GwfTraceFormatTest { { assertEquals("7", reader.get(TASK_ID)) }, { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, ) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b455a2cf..d45910c6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 47cce914..0d70446d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport : ReadSupport() { +internal class ResourceReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf>( + "id" to RESOURCE_ID, + "submissionTime" to RESOURCE_START_TIME, + "start_time" to RESOURCE_START_TIME, + "endTime" to RESOURCE_STOP_TIME, + "stop_time" to RESOURCE_STOP_TIME, + "maxCores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpu_capacity" to RESOURCE_CPU_CAPACITY, + "requiredMemory" to RESOURCE_MEM_CAPACITY, + "mem_capacity" to RESOURCE_MEM_CAPACITY, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 17840ceb..97aa00b2 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport : ReadSupport() { +internal class ResourceStateReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf>( + "id" to RESOURCE_ID, + "time" to RESOURCE_STATE_TIMESTAMP, + "timestamp" to RESOURCE_STATE_TIMESTAMP, + "duration" to RESOURCE_STATE_DURATION, + "cores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpuUsage" to RESOURCE_STATE_CPU_USAGE, + "cpu_usage" to RESOURCE_STATE_CPU_USAGE, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index dec0fef9..1f4f6195 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, { assertEquals("1023", reader.get(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, @@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroupsEmpty() { val path = Paths.get("src/test/resources/trace-v2.0") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index b969f3ef..916a5eca 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 1698f644..c3d644e8 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class SwfTraceFormatTest { @Test fun testReader() { val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index bc175b58..8db4c169 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 710de88e..4a8b2792 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -62,7 +62,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -89,7 +89,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReaderFull() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertDoesNotThrow { while (reader.nextRow()) { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index d6e42c8c..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 0017a4a9..8e7325de 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. */ -internal class TaskReadSupport : ReadSupport() { +internal class TaskReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, -- cgit v1.2.3 From e4d3a8add5388182cf7a12b1099678a0b769b106 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 14:28:28 +0200 Subject: perf(trace/calcite): Add support for projections This change adds support for projections in the Apache Calcite integration with OpenDC. This enables faster queries when only a subset of the table columns is selected. --- .../main/kotlin/org/opendc/trace/calcite/TraceTable.kt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index af521297..8c571b82 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -35,10 +35,7 @@ import org.apache.calcite.rel.logical.LogicalTableModify import org.apache.calcite.rel.type.RelDataType import org.apache.calcite.rel.type.RelDataTypeFactory import org.apache.calcite.rex.RexNode -import org.apache.calcite.schema.ModifiableTable -import org.apache.calcite.schema.ScannableTable -import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.schema.Table +import org.apache.calcite.schema.* import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName import java.time.Duration @@ -50,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractQueryableTable(Array::class.java), - ScannableTable, + ProjectableFilterableTable, ModifiableTable, InsertableTable { private var rowType: RelDataType? = null @@ -65,11 +62,15 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : return rowType } - override fun scan(root: DataContext): Enumerable> { + override fun scan(root: DataContext, filters: MutableList, projects: IntArray?): Enumerable> { + // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite + // assumes that they are declined and will perform the filters itself. + + val projection = projects?.map { table.columns[it] } val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) return object : AbstractEnumerable>() { override fun enumerator(): Enumerator> = - TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) } } -- cgit v1.2.3