summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-common/build.gradle.kts4
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt131
-rw-r--r--opendc-common/src/main/resources/log4j2.xml43
-rw-r--r--opendc-compute/opendc-compute-telemetry/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt192
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt195
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt153
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt95
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt109
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt280
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt224
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt139
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md70
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt3
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt7
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt3
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt1
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt3
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt1
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt19
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt14
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts8
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt (renamed from opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt)3
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt174
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt120
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt (renamed from opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt)23
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt146
27 files changed, 1463 insertions, 700 deletions
diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts
index 2dd35d83..aeb9bc4d 100644
--- a/opendc-common/build.gradle.kts
+++ b/opendc-common/build.gradle.kts
@@ -36,5 +36,9 @@ dependencies {
implementation(libs.kotlin.logging)
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion")
+ api(libs.log4j.core)
+ api(libs.log4j.slf4j)
+ api(libs.kotlin.logging)
+
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt b/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt
new file mode 100644
index 00000000..ee2f828a
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/logger/Logger.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.logger
+
+import mu.KotlinLogging
+import org.slf4j.Logger
+
+/**
+ * @return a slf4j logger named as the calling class simple name.
+ * Can also be used in the companion object to limit the instances of loggers.
+ *
+ *
+ * ```kotlin
+ * class Foo {
+ * val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo")
+ *
+ * companion object {
+ * val LOG by logger() // Same as: KotlinLogging.logger(name = "Foo")
+ * val LOG by logger("smth") // Same as: KotlinLogging.logger(name = "smth")
+ * }
+ * }
+ * ```
+ */
+public fun <T : Any> T.logger(name: String? = null): Lazy<Logger> {
+ return lazy {
+ KotlinLogging.logger(
+ name
+ ?: runCatching { this::class.java.enclosingClass.simpleName }
+ .getOrNull()
+ ?: "unknown",
+ )
+ }
+}
+
+/**
+ * Logs [msg] with WARN level and returns null.
+ * ```kotlin
+ * // Replace
+ * LOG.warn(<msg>)
+ * return null
+ * // With
+ * return LOG.warnAndNull(<msg>)
+ */
+public fun Logger.warnAndNull(msg: String): Nothing? {
+ this.warn(msg)
+ return null
+}
+
+/**
+ * Logs [msg] with ERROR level and returns null.
+ * ```kotlin
+ * // Replace
+ * LOG.error(<msg>)
+ * return null
+ * // With
+ * return LOG.errAndNull(<msg>)
+ */
+public fun Logger.errAndNull(msg: String): Nothing? {
+ this.error(msg)
+ return null
+}
+
+/**
+ * Logs [msg] with *WARN* level and returns [obj].
+ *
+ *
+ * ```kotlin
+ * // Replace
+ * if (<key> !in map) {
+ * LOG.warn("warn-message")
+ * return <default-value>
+ * } else map[<key>]
+ * // With
+ * map.getOrDefault(<key>, LOG.withWarn(<default-value>, "<warn-message>"))
+ * ```
+ */
+public fun <T> Logger.withWarn(
+ obj: T,
+ msg: String,
+): T {
+ this.warn(msg)
+ return obj
+}
+
+/**
+ * Logs [msg] with *ERROR* level and returns [obj].
+ */
+public fun <T> Logger.withErr(
+ obj: T,
+ msg: String,
+): T {
+ this.error(msg)
+ return obj
+}
+
+/**
+ * Logs [msg] with *INFO* level on a new line.
+ * ```console
+ *
+ * 09:28:08.830 [INFO] ScenariosSpec -
+ * | === Compute Export Config ===
+ * | Host Fields (columns) : timestamp,
+ * ...
+ * // Instead of
+ * 09:28:08.830 [INFO] ScenariosSpec - | === Compute Export Config ===
+ * | Host Fields (columns) : timestamp,
+ * ```
+ */
+public fun Logger.infoNewLine(msg: String) {
+ info("\n" + msg)
+}
diff --git a/opendc-common/src/main/resources/log4j2.xml b/opendc-common/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..07389360
--- /dev/null
+++ b/opendc-common/src/main/resources/log4j2.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="warn">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
index 10f7c610..e8692449 100644
--- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts
+++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
@@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation"
// Build configuration
plugins {
`kotlin-library-conventions`
+ kotlin("plugin.serialization") version "1.9.22"
}
dependencies {
api(projects.opendcCompute.opendcComputeApi)
+ api(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
+ implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon")))
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt
new file mode 100644
index 00000000..02e3e0bb
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
+import kotlinx.serialization.builtins.ListSerializer
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.descriptors.buildClassSerialDescriptor
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.encoding.encodeStructure
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonDecoder
+import kotlinx.serialization.json.JsonElement
+import kotlinx.serialization.json.jsonObject
+import org.opendc.common.logger.logger
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ColListSerializer
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.columnSerializer
+
+/**
+ * Aggregates the necessary settings to personalize the output
+ * parquet files for compute workloads.
+ *
+ * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
+ * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file.
+ * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
+ */
+@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
+public data class ComputeExportConfig(
+ public val hostExportColumns: Set<ExportColumn<HostTableReader>>,
+ public val serverExportColumns: Set<ExportColumn<ServerTableReader>>,
+ public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>,
+) {
+ public constructor(
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>,
+ serverExportColumns: Collection<ExportColumn<ServerTableReader>>,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>,
+ ) : this(
+ hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS,
+ serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS,
+ serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS,
+ )
+
+ /**
+ * @return formatted string representing the export config.
+ */
+ public fun fmt(): String =
+ """
+ | === Compute Export Config ===
+ | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
+ """.trimIndent()
+
+ public companion object {
+ internal val LOG by logger()
+
+ /**
+ * Force the jvm to load the default [ExportColumn]s relevant to compute export,
+ * so that they are available for deserialization.
+ */
+ public fun loadDfltColumns() {
+ DfltHostExportColumns
+ DfltServerExportColumns
+ DfltServiceExportColumns
+ }
+
+ /**
+ * Config that includes all columns defined in [DfltHostExportColumns],
+ * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded
+ * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader].
+ */
+ public val ALL_COLUMNS: ComputeExportConfig by lazy {
+ loadDfltColumns()
+ ComputeExportConfig(
+ hostExportColumns = ExportColumn.getAllLoadedColumns(),
+ serverExportColumns = ExportColumn.getAllLoadedColumns(),
+ serviceExportColumns = ExportColumn.getAllLoadedColumns(),
+ )
+ }
+
+ /**
+ * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs.
+ *
+ * This serializer makes use of reified column serializers for the 2 properties.
+ */
+ internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> {
+ override val descriptor: SerialDescriptor =
+ buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") {
+ element(
+ "hostExportColumns",
+ ListSerializer(columnSerializer<HostTableReader>()).descriptor,
+ )
+ element(
+ "serverExportColumns",
+ ListSerializer(columnSerializer<ServerTableReader>()).descriptor,
+ )
+ element(
+ "serviceExportColumns",
+ ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
+ )
+ }
+
+ override fun deserialize(decoder: Decoder): ComputeExportConfig {
+ val jsonDec =
+ (decoder as? JsonDecoder) ?: let {
+ // Basically a recursive call with a JsonDecoder.
+ return json.decodeFromString(decoder.decodeString().trim('"'))
+ }
+
+ // Loads the default columns so that they are available for deserialization.
+ loadDfltColumns()
+ val elem = jsonDec.decodeJsonElement().jsonObject
+
+ val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
+ val serverFields: List<ExportColumn<ServerTableReader>> = elem["serverExportColumns"].toFieldList()
+ val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()
+
+ return ComputeExportConfig(
+ hostExportColumns = hostFields,
+ serverExportColumns = serverFields,
+ serviceExportColumns = serviceFields,
+ )
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: ComputeExportConfig,
+ ) {
+ encoder.encodeStructure(descriptor) {
+ encodeSerializableElement(
+ descriptor,
+ 0,
+ ColListSerializer(columnSerializer<HostTableReader>()),
+ value.hostExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ descriptor,
+ 1,
+ ColListSerializer(columnSerializer<ServerTableReader>()),
+ value.serverExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ descriptor,
+ 2,
+ ColListSerializer(columnSerializer<ServiceTableReader>()),
+ value.serviceExportColumns.toList(),
+ )
+ }
+ }
+ }
+ }
+}
+
+private val json = Json { ignoreUnknownKeys = true }
+
+private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
+ this?.let {
+ json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it)
+ }?.ifEmpty {
+ ComputeExportConfig.LOG.warn(
+ "deserialized list of export columns for exportable ${T::class.simpleName} " +
+ "produced empty list, falling back to all loaded columns",
+ )
+ ExportColumn.getAllLoadedColumns<T>()
+ } ?: ExportColumn.getAllLoadedColumns<T>()
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt
new file mode 100644
index 00000000..68b5a664
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltHostExportColumns
+ * ...
+ * ```
+ */
+public object DfltHostExportColumns {
+ public val TIMESTAMP: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOST_ID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { Binary.fromString(it.host.id) }
+
+ public val HOST_NAME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_name"),
+ ) { Binary.fromString(it.host.name) }
+
+ public val CPU_COUNT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("cpu_count"),
+ ) { it.host.cpuCount }
+
+ public val MEM_CAPACITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.host.memCapacity }
+
+ public val GUESTS_TERMINATED: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_terminated"),
+ ) { it.guestsTerminated }
+
+ public val GUESTS_RUNNING: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_running"),
+ ) { it.guestsRunning }
+
+ public val GUESTS_ERROR: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_error"),
+ ) { it.guestsError }
+
+ public val GUESTS_INVALID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_invalid"),
+ ) { it.guestsInvalid }
+
+ public val CPU_LIMIT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_usage"),
+ ) { it.cpuUsage }
+
+ public val CPU_DEMAND: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_demand"),
+ ) { it.cpuDemand }
+
+ public val CPU_UTILIZATION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_utilization"),
+ ) { it.cpuUtilization }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val POWER_DRAW: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("power_draw"),
+ ) { it.powerDraw }
+
+ public val ENERGY_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("energy_usage"),
+ ) { it.energyUsage }
+
+ public val CARBON_INTENSITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("carbon_intensity"),
+ ) { it.carbonIntensity }
+
+ public val CARBON_EMISSION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("carbon_emission"),
+ ) { it.carbonEmission }
+
+ public val UP_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val BOOT_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt
new file mode 100644
index 00000000..91d6c9bf
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltServerExportColumns
+ * ...
+ * ```
+ */
+public object DfltServerExportColumns {
+ public val TIMESTAMP: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val SERVER_ID: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_id"),
+ ) { Binary.fromString(it.server.id) }
+
+ public val HOST_ID: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.optional(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { it.host?.id?.let { Binary.fromString(it) } }
+
+ public val SERVER_NAME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_name"),
+ ) { Binary.fromString(it.server.name) }
+
+ public val CPU_COUNT: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("cpu_count"),
+ ) { it.server.cpuCount }
+
+ public val MEM_CAPACITY: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.server.memCapacity }
+
+ public val CPU_LIMIT: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val UP_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val PROVISION_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("provision_time"),
+ ) { it.provisionTime?.toEpochMilli() }
+
+ public val BOOT_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt
new file mode 100644
index 00000000..89396545
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltServiceExportColumns
+ * ...
+ * ```
+ */
+public object DfltServiceExportColumns {
+ public val TIMESTAMP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOSTS_UP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("hosts_up"),
+ ) { it.hostsUp }
+
+ public val SERVERS_PENDING: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("servers_pending"),
+ ) { it.serversPending }
+
+ public val SERVERS_ACTIVE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("servers_active"),
+ ) { it.serversActive }
+
+ public val ATTEMPTS_SUCCESS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_success"),
+ ) { it.attemptsSuccess }
+
+ public val AT3yyTEMPTS_FAILURE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_failure"),
+ ) { it.attemptsFailure }
+
+ public val ATTEMPTS_ERROR: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_error"),
+ ) { it.attemptsError }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
index 1c910497..6bea4cc2 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
@@ -26,45 +26,100 @@ import org.opendc.compute.telemetry.ComputeMonitor
import org.opendc.compute.telemetry.table.HostTableReader
import org.opendc.compute.telemetry.table.ServerTableReader
import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.Exporter
import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter =
- ParquetServerDataWriter(
- File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- private val hostWriter =
- ParquetHostDataWriter(
- File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- private val serviceWriter =
- ParquetServiceDataWriter(
- File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- override fun record(reader: ServerTableReader) {
- serverWriter.write(reader)
+public class ParquetComputeMonitor(
+ private val hostExporter: Exporter<HostTableReader>,
+ private val serverExporter: Exporter<ServerTableReader>,
+ private val serviceExporter: Exporter<ServiceTableReader>,
+) : ComputeMonitor, AutoCloseable {
+ override fun record(reader: HostTableReader) {
+ hostExporter.write(reader)
}
- override fun record(reader: HostTableReader) {
- hostWriter.write(reader)
+ override fun record(reader: ServerTableReader) {
+ serverExporter.write(reader)
}
override fun record(reader: ServiceTableReader) {
- serviceWriter.write(reader)
+ serviceExporter.write(reader)
}
override fun close() {
- hostWriter.close()
- serviceWriter.close()
- serverWriter.close()
+ hostExporter.close()
+ serverExporter.close()
+ serviceExporter.close()
+ }
+
+ public companion object {
+ /**
+ * Overloaded constructor with [ComputeExportConfig] as parameter.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ computeExportConfig: ComputeExportConfig,
+ ): ParquetComputeMonitor =
+ invoke(
+ base = base,
+ partition = partition,
+ bufferSize = bufferSize,
+ hostExportColumns = computeExportConfig.hostExportColumns,
+ serverExportColumns = computeExportConfig.serverExportColumns,
+ serviceExportColumns = computeExportConfig.serviceExportColumns,
+ )
+
+ /**
+ * Constructor that loads default [ExportColumn]s defined in
+ * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns]
+ * in case optional parameters are omitted and all fields need to be retrieved.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null,
+ serverExportColumns: Collection<ExportColumn<ServerTableReader>>? = null,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null,
+ ): ParquetComputeMonitor {
+ // Loads the fields in case they need to be retrieved if optional params are omitted.
+ ComputeExportConfig.loadDfltColumns()
+
+ return ParquetComputeMonitor(
+ hostExporter =
+ Exporter(
+ outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
+ columns = hostExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ serverExporter =
+ Exporter(
+ outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
+ columns = serverExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ serviceExporter =
+ Exporter(
+ outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
+ columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ )
+ }
}
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
deleted file mode 100644
index 020e67f2..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.HostTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-
-/**
- * A Parquet event writer for [HostTableReader]s.
- */
-public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
- return builder
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "host-writer"
-
- /**
- * A [WriteSupport] implementation for a [HostTableReader].
- */
- private class HostDataWriteSupport() : WriteSupport<HostTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: HostTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: HostTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("host_id", 2)
- consumer.addBinary(Binary.fromString(data.host.id))
- consumer.endField("host_id", 2)
-
- consumer.startField("host_name", 3)
- consumer.addBinary(Binary.fromString(data.host.name))
- consumer.endField("host_name", 3)
-
- consumer.startField("cpu_count", 4)
- consumer.addInteger(data.host.cpuCount)
- consumer.endField("cpu_count", 4)
-
- consumer.startField("mem_capacity", 5)
- consumer.addLong(data.host.memCapacity)
- consumer.endField("mem_capacity", 5)
-
- consumer.startField("guests_terminated", 6)
- consumer.addInteger(data.guestsTerminated)
- consumer.endField("guests_terminated", 6)
-
- consumer.startField("guests_running", 7)
- consumer.addInteger(data.guestsRunning)
- consumer.endField("guests_running", 7)
-
- consumer.startField("guests_error", 8)
- consumer.addInteger(data.guestsError)
- consumer.endField("guests_error", 8)
-
- consumer.startField("guests_invalid", 9)
- consumer.addInteger(data.guestsInvalid)
- consumer.endField("guests_invalid", 9)
-
- consumer.startField("cpu_limit", 10)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 10)
-
- consumer.startField("cpu_usage", 11)
- consumer.addDouble(data.cpuUsage)
- consumer.endField("cpu_usage", 11)
-
- consumer.startField("cpu_demand", 12)
- consumer.addDouble(data.cpuUsage)
- consumer.endField("cpu_demand", 12)
-
- consumer.startField("cpu_utilization", 13)
- consumer.addDouble(data.cpuUtilization)
- consumer.endField("cpu_utilization", 13)
-
- consumer.startField("cpu_time_active", 14)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 14)
-
- consumer.startField("cpu_time_idle", 15)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 15)
-
- consumer.startField("cpu_time_steal", 16)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 16)
-
- consumer.startField("cpu_time_lost", 17)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 17)
-
- consumer.startField("power_draw", 18)
- consumer.addDouble(data.powerDraw)
- consumer.endField("power_draw", 18)
-
- consumer.startField("energy_usage", 19)
- consumer.addDouble(data.energyUsage)
- consumer.endField("energy_usage", 19)
-
- consumer.startField("carbon_intensity", 20)
- consumer.addDouble(data.carbonIntensity)
- consumer.endField("carbon_intensity", 20)
-
- consumer.startField("carbon_emission", 21)
- consumer.addDouble(data.carbonEmission)
- consumer.endField("carbon_emission", 21)
-
- consumer.startField("uptime", 22)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 22)
-
- consumer.startField("downtime", 23)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 23)
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 24)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 24)
- }
-
- val bootTimeAbsolute = data.bootTimeAbsolute
- if (bootTimeAbsolute != null) {
- consumer.startField("boot_time_absolute", 25)
- consumer.addLong(bootTimeAbsolute.toEpochMilli())
- consumer.endField("boot_time_absolute", 25)
- }
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the host data.
- */
- val SCHEMA: MessageType =
- Types
- .buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_name"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_terminated"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_running"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_error"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_invalid"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_demand"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_utilization"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("power_draw"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("energy_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("carbon_intensity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("carbon_emission"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time_absolute"),
- )
- .named("host")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
deleted file mode 100644
index e1b489ac..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.ServerTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-
-/**
- * A Parquet event writer for [ServerTableReader]s.
- */
-public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
- return builder
- .withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "server-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServerTableReader].
- */
- private class ServerDataWriteSupport() : WriteSupport<ServerTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServerTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: ServerTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("server_id", 2)
- consumer.addBinary(Binary.fromString(data.server.id))
- consumer.endField("server_id", 2)
-
- consumer.startField("server_name", 3)
- consumer.addBinary(Binary.fromString(data.server.name))
- consumer.endField("server_name", 3)
-
- val hostId = data.host?.id
- if (hostId != null) {
- consumer.startField("host_id", 4)
- consumer.addBinary(Binary.fromString(hostId))
- consumer.endField("host_id", 4)
- }
-
- consumer.startField("mem_capacity", 5)
- consumer.addLong(data.server.memCapacity)
- consumer.endField("mem_capacity", 5)
-
- consumer.startField("cpu_count", 6)
- consumer.addInteger(data.server.cpuCount)
- consumer.endField("cpu_count", 6)
-
- consumer.startField("cpu_limit", 7)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 7)
-
- consumer.startField("cpu_time_active", 8)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 8)
-
- consumer.startField("cpu_time_idle", 9)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 9)
-
- consumer.startField("cpu_time_steal", 10)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 10)
-
- consumer.startField("cpu_time_lost", 11)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 11)
-
- consumer.startField("uptime", 12)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 12)
-
- consumer.startField("downtime", 13)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 13)
-
- val provisionTime = data.provisionTime
- if (provisionTime != null) {
- consumer.startField("provision_time", 14)
- consumer.addLong(provisionTime.toEpochMilli())
- consumer.endField("provision_time", 14)
- }
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 15)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 15)
- }
-
- val bootTimeAbsolute = data.bootTimeAbsolute
- if (bootTimeAbsolute != null) {
- consumer.startField("boot_time_absolute", 16)
- consumer.addLong(bootTimeAbsolute.toEpochMilli())
- consumer.endField("boot_time_absolute", 16)
- }
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the server data.
- */
- val SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_name"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("provision_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time_absolute"),
- )
- .named("server")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
deleted file mode 100644
index eba8fc4f..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.ServiceTableReader
-import java.io.File
-
-/**
- * A Parquet event writer for [ServiceTableReader]s.
- */
-public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
- override fun toString(): String = "service-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServiceTableReader].
- */
- private class ServiceDataWriteSupport() : WriteSupport<ServiceTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServiceTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: ServiceTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("hosts_up", 2)
- consumer.addInteger(data.hostsUp)
- consumer.endField("hosts_up", 2)
-
- consumer.startField("hosts_down", 3)
- consumer.addInteger(data.hostsDown)
- consumer.endField("hosts_down", 3)
-
- consumer.startField("servers_pending", 4)
- consumer.addInteger(data.serversPending)
- consumer.endField("servers_pending", 4)
-
- consumer.startField("servers_active", 5)
- consumer.addInteger(data.serversActive)
- consumer.endField("servers_active", 5)
-
- consumer.startField("attempts_success", 6)
- consumer.addInteger(data.attemptsSuccess)
- consumer.endField("attempts_pending", 6)
-
- consumer.startField("attempts_failure", 7)
- consumer.addInteger(data.attemptsFailure)
- consumer.endField("attempts_failure", 7)
-
- consumer.startField("attempts_error", 8)
- consumer.addInteger(data.attemptsError)
- consumer.endField("attempts_error", 8)
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- private val SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_up"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_down"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_pending"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_success"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_failure"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_error"),
- )
- .named("service")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md
new file mode 100644
index 00000000..f48bc229
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md
@@ -0,0 +1,70 @@
+### Summary
+Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`.
+
+### Columns
+The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm.
+
+### Deserialization
+Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space).
+
+###### E.g.:
+***column name*** = "cpu\_count"
+***default column regex*** = "\\s*(?:cpu_count|cpu count)\\s*" (case insensitive)
+***matches*** = "cpu\_count", "cpu count", "CpU/_cOuNt" etc.
+
+### JSON Schema
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "type": "object",
+ "properties": {
+ "hostExportColumns": { "type": "array" },
+ "serverExportColumns": { "type": "array" } ,
+ "serviceExportColumns": { "type": "array" } ,
+ "required": [ /* NONE REQUIRED */ ]
+ }
+ },
+ ...
+ "required": [
+ ...
+ // NOT REQUIRED
+ ]
+}
+```
+
+&nbsp;
+###### Bad Formatting Cases
+- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged.
+- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged.
+- If no list is provided, then all loaded columns for that `Exportable` are used.
+
+
+### Example
+
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"],
+ "serverExportColumns": ["invalid-entry2"],
+ "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"]
+ },
+ ...
+```
+
+```json
+// console output
+10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring...
+10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring...
+10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns
+10:51:56.584 [INFO] ScenariosSpec -
+| === Compute Export Config ===
+| Host columns : timestamp, timestamp_absolute, guests_invalid
+| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute
+| Service columns : timestamp, servers_active, servers_pending
+
+```
+
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
index d41c6dc0..a7b8bedb 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
@@ -22,12 +22,13 @@
package org.opendc.compute.telemetry.table
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a host trace entry.
*/
-public interface HostTableReader {
+public interface HostTableReader : Exportable {
public fun copy(): HostTableReader
public fun setValues(table: HostTableReader)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
index 51113025..a1aed778 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.telemetry.table
+import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a server trace entry.
*/
-public interface ServerTableReader {
+public interface ServerTableReader : Exportable {
public fun copy(): ServerTableReader
public fun setValues(table: ServerTableReader)
@@ -102,3 +104,6 @@ public interface ServerTableReader {
*/
public val cpuLostTime: Long
}
+
+// Loads the default export fields for deserialization whenever this file is loaded.
+private val _ignore = DfltServerExportColumns
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
index e6c2a1ae..c3a92fc7 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
@@ -22,12 +22,13 @@
package org.opendc.compute.telemetry.table
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a service trace entry.
*/
-public interface ServiceTableReader {
+public interface ServiceTableReader : Exportable {
public fun copy(): ServiceTableReader
public fun setValues(table: ServiceTableReader)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index b48b8fe6..0f76d580 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -159,6 +159,7 @@ public fun addExportModel(
File("${scenario.outputFolder}/raw-output/$index"),
"seed=$seed",
bufferSize = 4096,
+ computeExportConfig = scenario.computeExportConfig,
),
Duration.ofSeconds(scenario.exportModelSpec.exportInterval),
startTime,
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
index 9ce462f4..c31f0300 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/Scenario.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.base.scenario
+import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
import org.opendc.experiments.base.scenario.specs.AllocationPolicySpec
import org.opendc.experiments.base.scenario.specs.CheckpointModelSpec
import org.opendc.experiments.base.scenario.specs.ExportModelSpec
@@ -41,6 +42,7 @@ import org.opendc.experiments.base.scenario.specs.WorkloadSpec
* @property name The String representing the name of the scenario. It defaults to an empty string.
* @property runs The Int representing the number of runs of the scenario. It defaults to 1.
* @property initialSeed The Int representing the initial seed of the scenario. It defaults to 0.
+ * @property computeExportConfig configures which parquet columns are to be included in the output files.
*/
public data class Scenario(
var id: Int = -1,
@@ -52,6 +54,7 @@ public data class Scenario(
val carbonTracePath: String? = null,
val exportModelSpec: ExportModelSpec = ExportModelSpec(),
val outputFolder: String = "output",
+ val computeExportConfig: ComputeExportConfig,
val name: String = "",
val runs: Int = 1,
val initialSeed: Int = 0,
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt
index e47d9c58..fb43f102 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioFactories.kt
@@ -82,6 +82,7 @@ public fun getScenarios(scenariosSpec: ScenariosSpec): List<Scenario> {
name = scenarioID.toString(),
runs = scenariosSpec.runs,
initialSeed = scenariosSpec.initialSeed,
+ computeExportConfig = scenarioSpec.computeExportConfig,
)
trackScenario(scenarioSpec, outputFolder)
scenarios.add(scenario)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt
index a7cda768..1fba71d1 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/ScenarioReader.kt
@@ -25,6 +25,7 @@ package org.opendc.experiments.base.scenario
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
+import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
import org.opendc.experiments.base.scenario.specs.ScenariosSpec
import java.io.File
import java.io.InputStream
@@ -35,25 +36,19 @@ public class ScenarioReader {
// private val jsonReader = Json { serializersModule = failureModule }
private val jsonReader = Json
- @OptIn(ExperimentalSerializationApi::class)
- public fun read(file: File): ScenariosSpec {
- val input = file.inputStream()
+ public fun read(file: File): ScenariosSpec = read(file.inputStream())
- return jsonReader.decodeFromStream<ScenariosSpec>(input)
- }
-
- @OptIn(ExperimentalSerializationApi::class)
- public fun read(path: Path): ScenariosSpec {
- val input = path.inputStream()
-
- return jsonReader.decodeFromStream<ScenariosSpec>(input)
- }
+ public fun read(path: Path): ScenariosSpec = read(path.inputStream())
/**
* Read the specified [input].
*/
@OptIn(ExperimentalSerializationApi::class)
public fun read(input: InputStream): ScenariosSpec {
+ // Loads the default parquet output fields,
+ // so that they can be deserialized
+ ComputeExportConfig.loadDfltColumns()
+
return jsonReader.decodeFromStream<ScenariosSpec>(input)
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt
index da3ceecf..cb4491b6 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/ScenariosSpec.kt
@@ -23,6 +23,9 @@
package org.opendc.experiments.base.scenario.specs
import kotlinx.serialization.Serializable
+import org.opendc.common.logger.infoNewLine
+import org.opendc.common.logger.logger
+import org.opendc.compute.telemetry.export.parquet.ComputeExportConfig
import java.util.UUID
@Serializable
@@ -30,6 +33,7 @@ public data class ScenarioSpec(
var id: Int = -1,
var name: String = "",
val outputFolder: String = "output",
+ val computeExportConfig: ComputeExportConfig,
val topology: ScenarioTopologySpec,
val workload: WorkloadSpec,
val allocationPolicy: AllocationPolicySpec = AllocationPolicySpec(),
@@ -50,6 +54,8 @@ public data class ScenarioSpec(
* @property outputFolder
* @property initialSeed
* @property runs
+ * @property computeExportConfig configures which parquet columns are to
+ * be included in the output files.
*/
@Serializable
public data class ScenariosSpec(
@@ -65,6 +71,7 @@ public data class ScenariosSpec(
val failureModels: Set<FailureModelSpec?> = setOf(null),
val checkpointModels: Set<CheckpointModelSpec?> = setOf(null),
val carbonTracePaths: Set<String?> = setOf(null),
+ val computeExportConfig: ComputeExportConfig = ComputeExportConfig.ALL_COLUMNS,
) {
init {
require(runs > 0) { "The number of runs should always be positive" }
@@ -75,6 +82,8 @@ public data class ScenariosSpec(
name = "unnamed-simulation-${UUID.randomUUID().toString().substring(0, 4)}"
// "workload=${workloads[0].name}_topology=${topologies[0].name}_allocationPolicy=${allocationPolicies[0].name}"
}
+
+ LOG.infoNewLine(computeExportConfig.fmt())
}
public fun getCartesian(): Sequence<ScenarioSpec> {
@@ -101,6 +110,7 @@ public data class ScenariosSpec(
id,
name,
outputFolder,
+ computeExportConfig = computeExportConfig,
topologyList[(i / topologyDiv) % topologyList.size],
workloadList[(i / workloadDiv) % workloadList.size],
allocationPolicyList[(i / allocationDiv) % allocationPolicyList.size],
@@ -113,4 +123,8 @@ public data class ScenariosSpec(
}
}
}
+
+ internal companion object {
+ private val LOG by logger()
+ }
}
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 4cdd4350..0a0507ef 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -25,9 +25,17 @@ description = "Parquet helpers for traces in OpenDC"
// Build configuration
plugins {
`kotlin-library-conventions`
+ kotlin("plugin.serialization") version "1.9.22"
}
dependencies {
+ // Needed for ParquetDataWriter
+ implementation(libs.kotlin.logging)
+
+ implementation(projects.opendcCommon)
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
+
// This configuration is necessary for a slim dependency on Apache Parquet
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt
index b96ee28b..e4b9a147 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.telemetry.export.parquet
+package org.opendc.trace.util.parquet
import mu.KotlinLogging
import org.apache.parquet.column.ParquetProperties
@@ -28,7 +28,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt
new file mode 100644
index 00000000..90e00f4b
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.Type
+import org.opendc.common.logger.logger
+import org.slf4j.Logger
+import kotlin.reflect.KClass
+
+/**
+ * A column that can be used to build a parquet schema to export [T] records.
+ *
+ * See [columnSerializer] for deserialization of this class.
+ *
+ * ```kotlin
+ * class Foo: Exportable<Foo> {
+ * ...
+ * val MY_FIELD = ExportColumn<Foo>(
+ * field = Types.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("my_field_name")
+ * ) { exportable: Foo -> addDouble(exportable.getMyValue()) }
+ * ```
+ *
+ * @param[field]
+ * The apache parquet field, it includes information such as:
+ * - Required (not)
+ * - Field name
+ * - [PrimitiveType] (e.g. [INT32], [DOUBLE] etc.)
+ * - [LogicalTypeAnnotation] (e.g. TIMESTAMP, etc.)
+ *
+ * @param[getValue]
+ * Retrieves the value to be exported from the [Exportable] of [T] passed as param.
+ * The value returned needs to match the expected [PrimitiveType] defined in the field.
+ *
+ * A second type parameter could have been added to the class to enforce the correct type at compile time,
+ * however it would have added too much complexity to the interface. `ExportColumn<Exportable>` -> `ExportColumn<Exportable, *>`
+ *
+ * @param[regex] The pattern used to determine whether a string refers to this column.
+ * The default one matches the column name with either underscores or blank
+ * spaces between words in a case-insensitive manner.
+ *
+ * @param[exportableClass]
+ * The [KClass] of the [Exportable]. Used for intuitive lof messages. This class
+ * can be instantiated with inline constructor [Companion.invoke] without providing this parameter.
+ */
+public class ExportColumn<T : Exportable>
+ @PublishedApi
+ internal constructor(
+ public val field: Type,
+ @PublishedApi internal val regex: Regex,
+ @PublishedApi internal val exportableClass: KClass<T>,
+ internal val getValue: (T) -> Any?,
+ ) {
+ /**
+ * The name of the column (e.g. "timestamp").
+ */
+ public val name: String by lazy { field.name }
+
+ /**
+ * The primitive type of the field (e.g. INT32).
+ */
+ public val primitiveTypeName: PrimitiveTypeName by lazy { field.asPrimitiveType().primitiveTypeName }
+
+ init {
+ // Adds the column among those that can be deserialized.
+ addField(this)
+ }
+
+ override fun toString(): String = "[ExportColumn: name=$name, exportable=${exportableClass.simpleName}]"
+
+ public companion object {
+ @PublishedApi
+ internal val LOG: Logger by logger()
+
+ /**
+ * Reified constructor, needed to store [T] class without providing it as parameter.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ field: Type,
+ regex: Regex = Regex("\\s*(?:${field.name}|${field.name.replace('_', ' ')})\\s*", RegexOption.IGNORE_CASE),
+ noinline getValue: (T) -> Any?,
+ ): ExportColumn<T> =
+ ExportColumn(
+ field = field,
+ getValue = getValue,
+ exportableClass = T::class,
+ regex = regex,
+ )
+
+ /**
+ * All the columns that have been instantiated. They are added in `init` block.
+ * Keep in mind that in order to deserialize to a column, that column needs to be loaded by the jvm.
+ */
+ @PublishedApi
+ internal val allColumns: MutableSet<ExportColumn<*>> = mutableSetOf()
+
+ @PublishedApi
+ internal val allColumnsLock: Mutex = Mutex()
+
+ /**
+ * Function invoked in the `init` block of each [ExportColumn].
+ * Adds the column to those that can be deserialized.
+ */
+ private fun <T : Exportable> addField(column: ExportColumn<T>): Unit =
+ runBlocking {
+ allColumnsLock.withLock { allColumns.add(column) }
+ }
+
+ /**
+ * @return the [ExportColumn] whose [ExportColumn.regex] matches [columnName] **and**
+ * whose generic type ([Exportable]) is [T] if any, `null` otherwise.
+ *
+ * This method needs to be inlined and reified cause of runtime type erasure
+ * that does not allow to type check the generic class parameter.
+ */
+ @Suppress("UNCHECKED_CAST") // I do not know why it is needed since the cast is nullable.
+ @PublishedApi
+ internal inline fun <reified T : Exportable> matchingColOrNull(columnName: String): ExportColumn<T>? =
+ runBlocking {
+ val allColumns = allColumnsLock.withLock { allColumns.toSet() }
+
+ allColumns.forEach { column ->
+ // If it is an ExportColumn of same type.
+ if (column.exportableClass == T::class) {
+ // Just a smart cast that always succeeds at runtime cause
+ // of type erasure but is needed at compile time.
+ (column as? ExportColumn<T>)
+ ?.regex
+ // If fieldName matches the field regex.
+ ?.matchEntire(columnName)
+ ?.let { return@runBlocking column }
+ }
+ }
+
+ null
+ }
+
+ /**
+ * Returns all [ExportColumn]s of type [T] that have been loaded up until now.
+ */
+ @Suppress("UNCHECKED_CAST")
+ public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> =
+ runBlocking {
+ allColumnsLock.withLock { allColumns.filter { it.exportableClass == T::class } as List<ExportColumn<T>> }
+ }
+ }
+ }
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt
new file mode 100644
index 00000000..e07980f9
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.SerializationException
+import kotlinx.serialization.builtins.ListSerializer
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.descriptors.serialDescriptor
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonDecoder
+import kotlinx.serialization.json.jsonArray
+import org.opendc.common.logger.errAndNull
+import org.opendc.common.logger.logger
+
+/**
+ * Returns a serializer for [ExportColumn] of [T] based on [ExportColumn.name]. Export columns can be
+ * deserialized from string values if the string matches a [ExportColumn.regex].
+ *
+ * ###### Note:
+ * - **In order to deserialize columns, they need to be loaded at runtime**.
+ * - **The serializer needs the reified type [T], meaning static deserialization
+ * (e.g. `@Serializable`, `@Serializer`) will not work. The serializer for [ExportColumn] of [T] needs to be retrieved with this method.**
+ *
+ * It is assumed the user always know what type of column is needed from deserialization,
+ * so that column can be encoded only by their name, not including their type (which would be tedious to write in json).
+ *
+ * ```kotlin
+ * // Decode column of Foo
+ * class Foo: Exportable
+ * json.decodeFrom<smth>(deserializer = columnSerializer<Foo>(), <smth>)
+ *
+ * // Decode a collection of columns of Foo
+ * json.decodeFrom<smth>(deserializer = ListSerializer(columnSerializer<Foo>()), <smth>)
+ * ```
+ */
+public inline fun <reified T : Exportable> columnSerializer(): KSerializer<ExportColumn<T>> =
+ object : KSerializer<ExportColumn<T>> {
+ override val descriptor: SerialDescriptor = serialDescriptor<String>()
+
+ override fun deserialize(decoder: Decoder): ExportColumn<T> {
+ val strValue = decoder.decodeString().trim('"')
+ return ExportColumn.matchingColOrNull<T>(strValue)
+ ?: throw SerializationException(
+ "unable to deserialize export column '$strValue'." +
+ "Keep in mind that export columns need to be loaded by the jvm in order to be deserialized",
+ )
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: ExportColumn<T>,
+ ) {
+ encoder.encodeString(value.name)
+ }
+ }
+
+/**
+ * Serializer for a [List] of [ExportColumn] of [T], with the peculiarity of
+ * ignoring unrecognized column names (logging an error when an
+ * unrecognized column is encountered).
+ */
+public class ColListSerializer<T : Exportable>(
+ private val columnSerializer: KSerializer<ExportColumn<T>>,
+) : KSerializer<List<ExportColumn<T>>> {
+ private val listSerializer = ListSerializer(columnSerializer)
+ override val descriptor: SerialDescriptor = ListSerializer(columnSerializer).descriptor
+
+ /**
+ * Unrecognized columns are ignored and an error message is logged.
+ *
+ * @return the decoded list of [ExportColumn]s (might be empty).
+ * @throws[SerializationException] if the current element is not a [jsonArray] or its string representation.
+ */
+ override fun deserialize(decoder: Decoder): List<ExportColumn<T>> =
+ (decoder as? JsonDecoder)?.decodeJsonElement()?.jsonArray?.mapNotNull {
+ try {
+ Json.decodeFromJsonElement(columnSerializer, it)
+ } catch (_: Exception) {
+ LOG.errAndNull("no match found for column $it, ignoring...")
+ }
+ } ?: let {
+ val strValue = decoder.decodeString().trim('"')
+ // Basically a recursive call with a json decoder instead of the argument decoder.
+ Json.decodeFromString(strValue)
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: List<ExportColumn<T>>,
+ ) {
+ listSerializer.serialize(encoder, value)
+ }
+
+ private companion object {
+ val LOG by logger()
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt
index a2e82df1..61e766d0 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 AtLarge Research
+ * Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.parquet.io.api.Binary
-import java.nio.ByteBuffer
-import java.util.UUID
+package org.opendc.trace.util.parquet.exporter
/**
- * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet.
+ * Classes that implement this interface can be exported
+ * as records in a parquet file through an [Exporter].
*/
-internal fun UUID.toBinary(): Binary {
- val bb = ByteBuffer.allocate(16)
- bb.putLong(mostSignificantBits)
- bb.putLong(leastSignificantBits)
- bb.rewind()
- return Binary.fromConstantByteBuffer(bb)
+public interface Exportable {
+ public companion object {
+ public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> {
+ return ExportColumn.getAllLoadedColumns()
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt
new file mode 100644
index 00000000..05f36530
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.trace.util.parquet.ParquetDataWriter
+import java.io.File
+
+public class Exporter<T : Exportable>
+ @PublishedApi
+ internal constructor(
+ outputFile: File,
+ writeSupp: WriteSupport<T>,
+ bufferSize: Int,
+ ) : ParquetDataWriter<T>(
+ path = outputFile,
+ writeSupport = writeSupp,
+ bufferSize = bufferSize,
+ ) {
+ public companion object {
+ /**
+ * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name.
+ * @param[outputFile] the output file where the [Exportable]s will be written.
+ * @param[columns] the columns that will be included in the output parquet file.
+ * @param[schemaName] the name of the schema of the output parquet file.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ outputFile: File,
+ vararg columns: ExportColumn<T> = emptyArray(),
+ schemaName: String? = null,
+ bufferSize: Int = 4096,
+ ): Exporter<T> =
+ Exporter(
+ outputFile = outputFile,
+ writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"),
+ bufferSize = bufferSize,
+ )
+
+ /**
+ * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name.
+ * @param[outputFile] the output file where the [Exportable]s will be written.
+ * @param[columns] the columns that will be included in the output parquet file.
+ * @param[schemaName] the name of the schema of the output parquet file.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ outputFile: File,
+ columns: Collection<ExportColumn<T>> = emptySet(),
+ schemaName: String? = null,
+ bufferSize: Int = 4096,
+ ): Exporter<T> =
+ Exporter(
+ outputFile = outputFile,
+ writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"),
+ bufferSize = bufferSize,
+ )
+
+ /**
+ * @return an anonymous [WriteSupport] for [T] with only the columns included in [columns].
+ */
+ @PublishedApi
+ internal fun <T : Exportable> writeSuppFor(
+ columns: Set<ExportColumn<T>>,
+ schemaName: String,
+ ): WriteSupport<T> =
+ object : WriteSupport<T>() {
+ private lateinit var cons: RecordConsumer
+
+ private val schema: MessageType =
+ Types
+ .buildMessage()
+ .addFields(*columns.map { it.field }.toTypedArray())
+ .named(schemaName)
+
+ override fun init(configuration: Configuration): WriteContext = WriteContext(schema, emptyMap())
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ cons = recordConsumer
+ }
+
+ override fun write(record: T) =
+ with(cons) {
+ startMessage()
+
+ columns.forEachIndexed { idx, column ->
+ fun <T> Any.castedOrThrow(): T {
+ @Suppress("UNCHECKED_CAST")
+ return (this as? T) ?: throw TypeCastException(
+ "attempt to add value of type ${this::class} to export " +
+ "field $column which requires a different type",
+ )
+ }
+ val valueToAdd: Any =
+ column.getValue(
+ record,
+ ) ?: return@forEachIndexed // Maybe add explicit check for optional fields
+
+ startField(column.name, idx)
+ when (column.primitiveTypeName) {
+ INT32 -> addInteger(valueToAdd.castedOrThrow())
+ INT64 -> addLong(valueToAdd.castedOrThrow())
+ DOUBLE -> addDouble(valueToAdd.castedOrThrow())
+ BINARY -> addBinary(valueToAdd.castedOrThrow())
+ FLOAT -> addFloat(valueToAdd.castedOrThrow())
+ BOOLEAN -> addBoolean(valueToAdd.castedOrThrow())
+ else -> throw RuntimeException(
+ "parquet primitive type name '${column.primitiveTypeName} is not supported",
+ )
+ }
+ endField(column.name, idx)
+ }
+
+ cons.endMessage()
+ }
+ }
+ }
+ }