diff options
| author | Alessio Leonardo Tomei <122273875+T0mexX@users.noreply.github.com> | 2024-08-22 16:45:22 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-22 16:45:22 +0200 |
| commit | f9ffdfb29a3f08ac11e739494e754c81ef4f5157 (patch) | |
| tree | abc65714427e2738e5278032230ba60a9b6f0a28 /opendc-compute/opendc-compute-telemetry/src | |
| parent | 4f98fb2bf8204f6af52cd6eeb3313d21c6ca95bc (diff) | |
Refactored exporters. Allows output column selection in scenario (#241) (#241)
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry/src')
14 files changed, 797 insertions, 846 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt new file mode 100644 index 00000000..02e3e0bb --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.encodeStructure +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.jsonObject +import org.opendc.common.logger.logger +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ColListSerializer +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.columnSerializer + +/** + * Aggregates the necessary settings to personalize the output + * parquet files for compute workloads. + * + * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file. + * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file. + * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file. + */ +@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class) +public data class ComputeExportConfig( + public val hostExportColumns: Set<ExportColumn<HostTableReader>>, + public val serverExportColumns: Set<ExportColumn<ServerTableReader>>, + public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>, +) { + public constructor( + hostExportColumns: Collection<ExportColumn<HostTableReader>>, + serverExportColumns: Collection<ExportColumn<ServerTableReader>>, + serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>, + ) : this( + hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS, + serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS, + serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS, + ) + + /** + * @return formatted string representing the export config. + */ + public fun fmt(): String = + """ + | === Compute Export Config === + | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')} + | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')} + | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')} + """.trimIndent() + + public companion object { + internal val LOG by logger() + + /** + * Force the jvm to load the default [ExportColumn]s relevant to compute export, + * so that they are available for deserialization. + */ + public fun loadDfltColumns() { + DfltHostExportColumns + DfltServerExportColumns + DfltServiceExportColumns + } + + /** + * Config that includes all columns defined in [DfltHostExportColumns], + * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded + * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader]. + */ + public val ALL_COLUMNS: ComputeExportConfig by lazy { + loadDfltColumns() + ComputeExportConfig( + hostExportColumns = ExportColumn.getAllLoadedColumns(), + serverExportColumns = ExportColumn.getAllLoadedColumns(), + serviceExportColumns = ExportColumn.getAllLoadedColumns(), + ) + } + + /** + * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs. + * + * This serializer makes use of reified column serializers for the 2 properties. + */ + internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> { + override val descriptor: SerialDescriptor = + buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") { + element( + "hostExportColumns", + ListSerializer(columnSerializer<HostTableReader>()).descriptor, + ) + element( + "serverExportColumns", + ListSerializer(columnSerializer<ServerTableReader>()).descriptor, + ) + element( + "serviceExportColumns", + ListSerializer(columnSerializer<ServiceTableReader>()).descriptor, + ) + } + + override fun deserialize(decoder: Decoder): ComputeExportConfig { + val jsonDec = + (decoder as? JsonDecoder) ?: let { + // Basically a recursive call with a JsonDecoder. + return json.decodeFromString(decoder.decodeString().trim('"')) + } + + // Loads the default columns so that they are available for deserialization. + loadDfltColumns() + val elem = jsonDec.decodeJsonElement().jsonObject + + val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList() + val serverFields: List<ExportColumn<ServerTableReader>> = elem["serverExportColumns"].toFieldList() + val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList() + + return ComputeExportConfig( + hostExportColumns = hostFields, + serverExportColumns = serverFields, + serviceExportColumns = serviceFields, + ) + } + + override fun serialize( + encoder: Encoder, + value: ComputeExportConfig, + ) { + encoder.encodeStructure(descriptor) { + encodeSerializableElement( + descriptor, + 0, + ColListSerializer(columnSerializer<HostTableReader>()), + value.hostExportColumns.toList(), + ) + encodeSerializableElement( + descriptor, + 1, + ColListSerializer(columnSerializer<ServerTableReader>()), + value.serverExportColumns.toList(), + ) + encodeSerializableElement( + descriptor, + 2, + ColListSerializer(columnSerializer<ServiceTableReader>()), + value.serviceExportColumns.toList(), + ) + } + } + } + } +} + +private val json = Json { ignoreUnknownKeys = true } + +private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> = + this?.let { + json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it) + }?.ifEmpty { + ComputeExportConfig.LOG.warn( + "deserialized list of export columns for exportable ${T::class.simpleName} " + + "produced empty list, falling back to all loaded columns", + ) + ExportColumn.getAllLoadedColumns<T>() + } ?: ExportColumn.getAllLoadedColumns<T>() diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt new file mode 100644 index 00000000..68b5a664 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.HostTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltHostExportColumns + * ... + * ``` + */ +public object DfltHostExportColumns { + public val TIMESTAMP: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOST_ID: ExportColumn<HostTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { Binary.fromString(it.host.id) } + + public val HOST_NAME: ExportColumn<HostTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_name"), + ) { Binary.fromString(it.host.name) } + + public val CPU_COUNT: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.host.cpuCount } + + public val MEM_CAPACITY: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.host.memCapacity } + + public val GUESTS_TERMINATED: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_terminated"), + ) { it.guestsTerminated } + + public val GUESTS_RUNNING: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_running"), + ) { it.guestsRunning } + + public val GUESTS_ERROR: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_error"), + ) { it.guestsError } + + public val GUESTS_INVALID: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT32).named("guests_invalid"), + ) { it.guestsInvalid } + + public val CPU_LIMIT: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_USAGE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_usage"), + ) { it.cpuUsage } + + public val CPU_DEMAND: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_demand"), + ) { it.cpuDemand } + + public val CPU_UTILIZATION: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_utilization"), + ) { it.cpuUtilization } + + public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val POWER_DRAW: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("power_draw"), + ) { it.powerDraw } + + public val ENERGY_USAGE: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("energy_usage"), + ) { it.energyUsage } + + public val CARBON_INTENSITY: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_intensity"), + ) { it.carbonIntensity } + + public val CARBON_EMISSION: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("carbon_emission"), + ) { it.carbonEmission } + + public val UP_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val BOOT_TIME: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn<HostTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt new file mode 100644 index 00000000..91d6c9bf --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServerTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltServerExportColumns + * ... + * ``` + */ +public object DfltServerExportColumns { + public val TIMESTAMP: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val SERVER_ID: ExportColumn<ServerTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_id"), + ) { Binary.fromString(it.server.id) } + + public val HOST_ID: ExportColumn<ServerTableReader> = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("host_id"), + ) { it.host?.id?.let { Binary.fromString(it) } } + + public val SERVER_NAME: ExportColumn<ServerTableReader> = + ExportColumn( + field = + Types.required(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("server_name"), + ) { Binary.fromString(it.server.name) } + + public val CPU_COUNT: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT32).named("cpu_count"), + ) { it.server.cpuCount } + + public val MEM_CAPACITY: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("mem_capacity"), + ) { it.server.memCapacity } + + public val CPU_LIMIT: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(DOUBLE).named("cpu_limit"), + ) { it.cpuLimit } + + public val CPU_TIME_ACTIVE: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_active"), + ) { it.cpuActiveTime } + + public val CPU_TIME_IDLE: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_idle"), + ) { it.cpuIdleTime } + + public val CPU_TIME_STEAL: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_steal"), + ) { it.cpuStealTime } + + public val CPU_TIME_LOST: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("cpu_time_lost"), + ) { it.cpuLostTime } + + public val UP_TIME: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("uptime"), + ) { it.uptime } + + public val DOWN_TIME: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.required(INT64).named("downtime"), + ) { it.downtime } + + public val PROVISION_TIME: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.optional(INT64).named("provision_time"), + ) { it.provisionTime?.toEpochMilli() } + + public val BOOT_TIME: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time"), + ) { it.bootTime?.toEpochMilli() } + + public val BOOT_TIME_ABS: ExportColumn<ServerTableReader> = + ExportColumn( + field = Types.optional(INT64).named("boot_time_absolute"), + ) { it.bootTimeAbsolute?.toEpochMilli() } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt new file mode 100644 index 00000000..89396545 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.telemetry.export.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn + +/** + * This object wraps the [ExportColumn]s to solves ambiguity for field + * names that are included in more than 1 exportable. + * + * Additionally, it allows to load all the fields at once by just its symbol, + * so that these columns can be deserialized. Additional fields can be added + * from anywhere, and they are deserializable as long as they are loaded by the jvm. + * + * ```kotlin + * ... + * // Loads the column + * DfltServiceExportColumns + * ... + * ``` + */ +public object DfltServiceExportColumns { + public val TIMESTAMP: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp"), + ) { it.timestamp.toEpochMilli() } + + public val TIMESTAMP_ABS: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT64).named("timestamp_absolute"), + ) { it.timestampAbsolute.toEpochMilli() } + + public val HOSTS_UP: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("hosts_up"), + ) { it.hostsUp } + + public val SERVERS_PENDING: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("servers_pending"), + ) { it.serversPending } + + public val SERVERS_ACTIVE: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("servers_active"), + ) { it.serversActive } + + public val ATTEMPTS_SUCCESS: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("attempts_success"), + ) { it.attemptsSuccess } + + public val AT3yyTEMPTS_FAILURE: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("attempts_failure"), + ) { it.attemptsFailure } + + public val ATTEMPTS_ERROR: ExportColumn<ServiceTableReader> = + ExportColumn( + field = Types.required(INT32).named("attempts_error"), + ) { it.attemptsError } + + /** + * The columns that are always included in the output file. + */ + internal val BASE_EXPORT_COLUMNS = + setOf( + TIMESTAMP_ABS, + TIMESTAMP, + ) +} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt index 1c910497..6bea4cc2 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt @@ -26,45 +26,100 @@ import org.opendc.compute.telemetry.ComputeMonitor import org.opendc.compute.telemetry.table.HostTableReader import org.opendc.compute.telemetry.table.ServerTableReader import org.opendc.compute.telemetry.table.ServiceTableReader +import org.opendc.trace.util.parquet.exporter.ExportColumn +import org.opendc.trace.util.parquet.exporter.Exportable +import org.opendc.trace.util.parquet.exporter.Exporter import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = - ParquetServerDataWriter( - File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val hostWriter = - ParquetHostDataWriter( - File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - private val serviceWriter = - ParquetServiceDataWriter( - File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, - bufferSize, - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) +public class ParquetComputeMonitor( + private val hostExporter: Exporter<HostTableReader>, + private val serverExporter: Exporter<ServerTableReader>, + private val serviceExporter: Exporter<ServiceTableReader>, +) : ComputeMonitor, AutoCloseable { + override fun record(reader: HostTableReader) { + hostExporter.write(reader) } - override fun record(reader: HostTableReader) { - hostWriter.write(reader) + override fun record(reader: ServerTableReader) { + serverExporter.write(reader) } override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) + serviceExporter.write(reader) } override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() + hostExporter.close() + serverExporter.close() + serviceExporter.close() + } + + public companion object { + /** + * Overloaded constructor with [ComputeExportConfig] as parameter. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + computeExportConfig: ComputeExportConfig, + ): ParquetComputeMonitor = + invoke( + base = base, + partition = partition, + bufferSize = bufferSize, + hostExportColumns = computeExportConfig.hostExportColumns, + serverExportColumns = computeExportConfig.serverExportColumns, + serviceExportColumns = computeExportConfig.serviceExportColumns, + ) + + /** + * Constructor that loads default [ExportColumn]s defined in + * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns] + * in case optional parameters are omitted and all fields need to be retrieved. + * + * @param[base] parent pathname for output file. + * @param[partition] child pathname for output file. + * @param[bufferSize] size of the buffer used by the writer thread. + */ + public operator fun invoke( + base: File, + partition: String, + bufferSize: Int, + hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null, + serverExportColumns: Collection<ExportColumn<ServerTableReader>>? = null, + serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null, + ): ParquetComputeMonitor { + // Loads the fields in case they need to be retrieved if optional params are omitted. + ComputeExportConfig.loadDfltColumns() + + return ParquetComputeMonitor( + hostExporter = + Exporter( + outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, + columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serverExporter = + Exporter( + outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() }, + columns = serverExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + serviceExporter = + Exporter( + outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, + columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), + bufferSize = bufferSize, + ), + ) + } } } diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt deleted file mode 100644 index b96ee28b..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import mu.KotlinLogging -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * A writer that writes data in Parquet format. - * - * @param path The path to the file to write the data to. - * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. - */ -public abstract class ParquetDataWriter<in T>( - path: File, - private val writeSupport: WriteSupport<T>, - bufferSize: Int = 4096, -) : AutoCloseable { - /** - * The logging instance to use. - */ - private val logger = KotlinLogging.logger {} - - /** - * The queue of records to process. - */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) - - /** - * An exception to be propagated to the actual writer. - */ - private var exception: Throwable? = null - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = - thread(start = false, name = this.toString()) { - val writer = - let { - val builder = - LocalParquetWriter.builder(path.toPath(), writeSupport) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - buildWriter(builder) - } - - val queue = queue - val buf = mutableListOf<T>() - var shouldStop = false - - try { - while (!shouldStop) { - try { - writer.write(queue.take()) - } catch (e: InterruptedException) { - shouldStop = true - } - - if (queue.drainTo(buf) > 0) { - for (data in buf) { - writer.write(data) - } - buf.clear() - } - } - } catch (e: Throwable) { - logger.error(e) { "Failure in Parquet data writer" } - exception = e - } finally { - writer.close() - } - } - - /** - * Build the [ParquetWriter] used to write the Parquet files. - */ - protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { - return builder.build() - } - - /** - * Write the specified metrics to the database. - */ - public fun write(data: T) { - val exception = exception - if (exception != null) { - throw IllegalStateException("Writer thread failed", exception) - } - - queue.put(data) - } - - /** - * Signal the writer to stop. - */ - override fun close() { - writerThread.interrupt() - writerThread.join() - } - - init { - writerThread.start() - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt deleted file mode 100644 index 020e67f2..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.HostTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [HostTableReader]s. - */ -public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { - return builder - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "host-writer" - - /** - * A [WriteSupport] implementation for a [HostTableReader]. - */ - private class HostDataWriteSupport() : WriteSupport<HostTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: HostTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: HostTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("host_id", 2) - consumer.addBinary(Binary.fromString(data.host.id)) - consumer.endField("host_id", 2) - - consumer.startField("host_name", 3) - consumer.addBinary(Binary.fromString(data.host.name)) - consumer.endField("host_name", 3) - - consumer.startField("cpu_count", 4) - consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 4) - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.host.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("guests_terminated", 6) - consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 6) - - consumer.startField("guests_running", 7) - consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 7) - - consumer.startField("guests_error", 8) - consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 8) - - consumer.startField("guests_invalid", 9) - consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 9) - - consumer.startField("cpu_limit", 10) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 10) - - consumer.startField("cpu_usage", 11) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_usage", 11) - - consumer.startField("cpu_demand", 12) - consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_demand", 12) - - consumer.startField("cpu_utilization", 13) - consumer.addDouble(data.cpuUtilization) - consumer.endField("cpu_utilization", 13) - - consumer.startField("cpu_time_active", 14) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 14) - - consumer.startField("cpu_time_idle", 15) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 15) - - consumer.startField("cpu_time_steal", 16) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 16) - - consumer.startField("cpu_time_lost", 17) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 17) - - consumer.startField("power_draw", 18) - consumer.addDouble(data.powerDraw) - consumer.endField("power_draw", 18) - - consumer.startField("energy_usage", 19) - consumer.addDouble(data.energyUsage) - consumer.endField("energy_usage", 19) - - consumer.startField("carbon_intensity", 20) - consumer.addDouble(data.carbonIntensity) - consumer.endField("carbon_intensity", 20) - - consumer.startField("carbon_emission", 21) - consumer.addDouble(data.carbonEmission) - consumer.endField("carbon_emission", 21) - - consumer.startField("uptime", 22) - consumer.addLong(data.uptime) - consumer.endField("uptime", 22) - - consumer.startField("downtime", 23) - consumer.addLong(data.downtime) - consumer.endField("downtime", 23) - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 24) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 24) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 25) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 25) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the host data. - */ - val SCHEMA: MessageType = - Types - .buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_name"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_demand"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_utilization"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("power_draw"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("energy_usage"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_intensity"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("carbon_emission"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("host") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt deleted file mode 100644 index e1b489ac..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServerTableReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.io.File - -/** - * A Parquet event writer for [ServerTableReader]s. - */ -public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { - return builder - .withDictionaryEncoding("server_id", true) - .withDictionaryEncoding("host_id", true) - .build() - } - - override fun toString(): String = "server-writer" - - /** - * A [WriteSupport] implementation for a [ServerTableReader]. - */ - private class ServerDataWriteSupport() : WriteSupport<ServerTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServerTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServerTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("server_id", 2) - consumer.addBinary(Binary.fromString(data.server.id)) - consumer.endField("server_id", 2) - - consumer.startField("server_name", 3) - consumer.addBinary(Binary.fromString(data.server.name)) - consumer.endField("server_name", 3) - - val hostId = data.host?.id - if (hostId != null) { - consumer.startField("host_id", 4) - consumer.addBinary(Binary.fromString(hostId)) - consumer.endField("host_id", 4) - } - - consumer.startField("mem_capacity", 5) - consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 5) - - consumer.startField("cpu_count", 6) - consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 6) - - consumer.startField("cpu_limit", 7) - consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 7) - - consumer.startField("cpu_time_active", 8) - consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 8) - - consumer.startField("cpu_time_idle", 9) - consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 9) - - consumer.startField("cpu_time_steal", 10) - consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 10) - - consumer.startField("cpu_time_lost", 11) - consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 11) - - consumer.startField("uptime", 12) - consumer.addLong(data.uptime) - consumer.endField("uptime", 12) - - consumer.startField("downtime", 13) - consumer.addLong(data.downtime) - consumer.endField("downtime", 13) - - val provisionTime = data.provisionTime - if (provisionTime != null) { - consumer.startField("provision_time", 14) - consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 14) - } - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 15) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 15) - } - - val bootTimeAbsolute = data.bootTimeAbsolute - if (bootTimeAbsolute != null) { - consumer.startField("boot_time_absolute", 16) - consumer.addLong(bootTimeAbsolute.toEpochMilli()) - consumer.endField("boot_time_absolute", 16) - } - - consumer.endMessage() - } - } - - private companion object { - /** - * The schema of the server data. - */ - val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("server_name"), - Types - .optional(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("host_id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_limit"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_idle"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_steal"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("boot_time_absolute"), - ) - .named("server") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt deleted file mode 100644 index eba8fc4f..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.compute.telemetry.table.ServiceTableReader -import java.io.File - -/** - * A Parquet event writer for [ServiceTableReader]s. - */ -public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { - override fun toString(): String = "service-writer" - - /** - * A [WriteSupport] implementation for a [ServiceTableReader]. - */ - private class ServiceDataWriteSupport() : WriteSupport<ServiceTableReader>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ServiceTableReader) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - data: ServiceTableReader, - ) { - consumer.startMessage() - - consumer.startField("timestamp", 0) - consumer.addLong(data.timestamp.toEpochMilli()) - consumer.endField("timestamp", 0) - - consumer.startField("timestamp_absolute", 1) - consumer.addLong(data.timestampAbsolute.toEpochMilli()) - consumer.endField("timestamp_absolute", 1) - - consumer.startField("hosts_up", 2) - consumer.addInteger(data.hostsUp) - consumer.endField("hosts_up", 2) - - consumer.startField("hosts_down", 3) - consumer.addInteger(data.hostsDown) - consumer.endField("hosts_down", 3) - - consumer.startField("servers_pending", 4) - consumer.addInteger(data.serversPending) - consumer.endField("servers_pending", 4) - - consumer.startField("servers_active", 5) - consumer.addInteger(data.serversActive) - consumer.endField("servers_active", 5) - - consumer.startField("attempts_success", 6) - consumer.addInteger(data.attemptsSuccess) - consumer.endField("attempts_pending", 6) - - consumer.startField("attempts_failure", 7) - consumer.addInteger(data.attemptsFailure) - consumer.endField("attempts_failure", 7) - - consumer.startField("attempts_error", 8) - consumer.addInteger(data.attemptsError) - consumer.endField("attempts_error", 8) - - consumer.endMessage() - } - } - - private companion object { - private val SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("timestamp_absolute"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_up"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("hosts_down"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_pending"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("servers_active"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_success"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_failure"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("attempts_error"), - ) - .named("service") - } -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md new file mode 100644 index 00000000..f48bc229 --- /dev/null +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md @@ -0,0 +1,70 @@ +### Summary +Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`. + +### Columns +The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm. + +### Deserialization +Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space). + +###### E.g.: +***column name*** = "cpu\_count" +***default column regex*** = "\\s*(?:cpu_count|cpu count)\\s*" (case insensitive) +***matches*** = "cpu\_count", "cpu count", "CpU/_cOuNt" etc. + +### JSON Schema +```json +// scenario.json +{ + ... + "computeExportConfig": { + "type": "object", + "properties": { + "hostExportColumns": { "type": "array" }, + "serverExportColumns": { "type": "array" } , + "serviceExportColumns": { "type": "array" } , + "required": [ /* NONE REQUIRED */ ] + } + }, + ... + "required": [ + ... + // NOT REQUIRED + ] +} +``` + + +###### Bad Formatting Cases +- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged. +- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged. +- If no list is provided, then all loaded columns for that `Exportable` are used. + + +### Example + +```json +// scenario.json +{ + ... + "computeExportConfig": { + "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"], + "serverExportColumns": ["invalid-entry2"], + "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"] + }, + ... +``` + +```json +// console output +10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring... +10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring... +10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns +10:51:56.584 [INFO] ScenariosSpec - +| === Compute Export Config === +| Host columns : timestamp, timestamp_absolute, guests_invalid +| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute +| Service columns : timestamp, servers_active, servers_pending + +``` + diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt deleted file mode 100644 index a2e82df1..00000000 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.telemetry.export.parquet - -import org.apache.parquet.io.api.Binary -import java.nio.ByteBuffer -import java.util.UUID - -/** - * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet. - */ -internal fun UUID.toBinary(): Binary { - val bb = ByteBuffer.allocate(16) - bb.putLong(mostSignificantBits) - bb.putLong(leastSignificantBits) - bb.rewind() - return Binary.fromConstantByteBuffer(bb) -} diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index d41c6dc0..a7b8bedb 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a host trace entry. */ -public interface HostTableReader { +public interface HostTableReader : Exportable { public fun copy(): HostTableReader public fun setValues(table: HostTableReader) diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt index 51113025..a1aed778 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -22,12 +22,14 @@ package org.opendc.compute.telemetry.table +import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a server trace entry. */ -public interface ServerTableReader { +public interface ServerTableReader : Exportable { public fun copy(): ServerTableReader public fun setValues(table: ServerTableReader) @@ -102,3 +104,6 @@ public interface ServerTableReader { */ public val cpuLostTime: Long } + +// Loads the default export fields for deserialization whenever this file is loaded. +private val _ignore = DfltServerExportColumns diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index e6c2a1ae..c3a92fc7 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -22,12 +22,13 @@ package org.opendc.compute.telemetry.table +import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant /** * An interface that is used to read a row of a service trace entry. */ -public interface ServiceTableReader { +public interface ServiceTableReader : Exportable { public fun copy(): ServiceTableReader public fun setValues(table: ServiceTableReader) |
