summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-telemetry
diff options
context:
space:
mode:
authorAlessio Leonardo Tomei <122273875+T0mexX@users.noreply.github.com>2024-08-22 16:45:22 +0200
committerGitHub <noreply@github.com>2024-08-22 16:45:22 +0200
commitf9ffdfb29a3f08ac11e739494e754c81ef4f5157 (patch)
treeabc65714427e2738e5278032230ba60a9b6f0a28 /opendc-compute/opendc-compute-telemetry
parent4f98fb2bf8204f6af52cd6eeb3313d21c6ca95bc (diff)
Refactored exporters. Allows output column selection in scenario (#241) (#241)
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry')
-rw-r--r--opendc-compute/opendc-compute-telemetry/build.gradle.kts3
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt192
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt195
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt153
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt95
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt109
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt135
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt280
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt224
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt139
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md70
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt3
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt7
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt3
15 files changed, 800 insertions, 846 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
index 10f7c610..e8692449 100644
--- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts
+++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
@@ -25,12 +25,15 @@ description = "OpenDC Compute Service implementation"
// Build configuration
plugins {
`kotlin-library-conventions`
+ kotlin("plugin.serialization") version "1.9.22"
}
dependencies {
api(projects.opendcCompute.opendcComputeApi)
+ api(projects.opendcTrace.opendcTraceParquet)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
+ implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-carbon")))
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt
new file mode 100644
index 00000000..02e3e0bb
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ComputeExportConfig.kt
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
+import kotlinx.serialization.builtins.ListSerializer
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.descriptors.buildClassSerialDescriptor
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.encoding.encodeStructure
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonDecoder
+import kotlinx.serialization.json.JsonElement
+import kotlinx.serialization.json.jsonObject
+import org.opendc.common.logger.logger
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ColListSerializer
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.columnSerializer
+
+/**
+ * Aggregates the necessary settings to personalize the output
+ * parquet files for compute workloads.
+ *
+ * @param[hostExportColumns] the columns that will be included in the `host.parquet` raw output file.
+ * @param[serverExportColumns] the columns that will be included in the `server.parquet` raw output file.
+ * @param[serviceExportColumns] the columns that will be included in the `service.parquet` raw output file.
+ */
+@Serializable(with = ComputeExportConfig.Companion.ComputeExportConfigSerializer::class)
+public data class ComputeExportConfig(
+ public val hostExportColumns: Set<ExportColumn<HostTableReader>>,
+ public val serverExportColumns: Set<ExportColumn<ServerTableReader>>,
+ public val serviceExportColumns: Set<ExportColumn<ServiceTableReader>>,
+) {
+ public constructor(
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>,
+ serverExportColumns: Collection<ExportColumn<ServerTableReader>>,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>,
+ ) : this(
+ hostExportColumns.toSet() + DfltHostExportColumns.BASE_EXPORT_COLUMNS,
+ serverExportColumns.toSet() + DfltServerExportColumns.BASE_EXPORT_COLUMNS,
+ serviceExportColumns.toSet() + DfltServiceExportColumns.BASE_EXPORT_COLUMNS,
+ )
+
+ /**
+ * @return formatted string representing the export config.
+ */
+ public fun fmt(): String =
+ """
+ | === Compute Export Config ===
+ | Host columns : ${hostExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Server columns : ${serverExportColumns.map { it.name }.toString().trim('[', ']')}
+ | Service columns : ${serviceExportColumns.map { it.name }.toString().trim('[', ']')}
+ """.trimIndent()
+
+ public companion object {
+ internal val LOG by logger()
+
+ /**
+ * Force the jvm to load the default [ExportColumn]s relevant to compute export,
+ * so that they are available for deserialization.
+ */
+ public fun loadDfltColumns() {
+ DfltHostExportColumns
+ DfltServerExportColumns
+ DfltServiceExportColumns
+ }
+
+ /**
+ * Config that includes all columns defined in [DfltHostExportColumns],
+ * [DfltServerExportColumns], [DfltServiceExportColumns] among all other loaded
+ * columns for [HostTableReader], [ServerTableReader] and [ServiceTableReader].
+ */
+ public val ALL_COLUMNS: ComputeExportConfig by lazy {
+ loadDfltColumns()
+ ComputeExportConfig(
+ hostExportColumns = ExportColumn.getAllLoadedColumns(),
+ serverExportColumns = ExportColumn.getAllLoadedColumns(),
+ serviceExportColumns = ExportColumn.getAllLoadedColumns(),
+ )
+ }
+
+ /**
+ * A runtime [KSerializer] is needed for reasons explained in [columnSerializer] docs.
+ *
+ * This serializer makes use of reified column serializers for the 2 properties.
+ */
+ internal object ComputeExportConfigSerializer : KSerializer<ComputeExportConfig> {
+ override val descriptor: SerialDescriptor =
+ buildClassSerialDescriptor("org.opendc.compute.telemetry.export.parquet.ComputeExportConfig") {
+ element(
+ "hostExportColumns",
+ ListSerializer(columnSerializer<HostTableReader>()).descriptor,
+ )
+ element(
+ "serverExportColumns",
+ ListSerializer(columnSerializer<ServerTableReader>()).descriptor,
+ )
+ element(
+ "serviceExportColumns",
+ ListSerializer(columnSerializer<ServiceTableReader>()).descriptor,
+ )
+ }
+
+ override fun deserialize(decoder: Decoder): ComputeExportConfig {
+ val jsonDec =
+ (decoder as? JsonDecoder) ?: let {
+ // Basically a recursive call with a JsonDecoder.
+ return json.decodeFromString(decoder.decodeString().trim('"'))
+ }
+
+ // Loads the default columns so that they are available for deserialization.
+ loadDfltColumns()
+ val elem = jsonDec.decodeJsonElement().jsonObject
+
+ val hostFields: List<ExportColumn<HostTableReader>> = elem["hostExportColumns"].toFieldList()
+ val serverFields: List<ExportColumn<ServerTableReader>> = elem["serverExportColumns"].toFieldList()
+ val serviceFields: List<ExportColumn<ServiceTableReader>> = elem["serviceExportColumns"].toFieldList()
+
+ return ComputeExportConfig(
+ hostExportColumns = hostFields,
+ serverExportColumns = serverFields,
+ serviceExportColumns = serviceFields,
+ )
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: ComputeExportConfig,
+ ) {
+ encoder.encodeStructure(descriptor) {
+ encodeSerializableElement(
+ descriptor,
+ 0,
+ ColListSerializer(columnSerializer<HostTableReader>()),
+ value.hostExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ descriptor,
+ 1,
+ ColListSerializer(columnSerializer<ServerTableReader>()),
+ value.serverExportColumns.toList(),
+ )
+ encodeSerializableElement(
+ descriptor,
+ 2,
+ ColListSerializer(columnSerializer<ServiceTableReader>()),
+ value.serviceExportColumns.toList(),
+ )
+ }
+ }
+ }
+ }
+}
+
+private val json = Json { ignoreUnknownKeys = true }
+
+private inline fun <reified T : Exportable> JsonElement?.toFieldList(): List<ExportColumn<T>> =
+ this?.let {
+ json.decodeFromJsonElement(ColListSerializer(columnSerializer<T>()), it)
+ }?.ifEmpty {
+ ComputeExportConfig.LOG.warn(
+ "deserialized list of export columns for exportable ${T::class.simpleName} " +
+ "produced empty list, falling back to all loaded columns",
+ )
+ ExportColumn.getAllLoadedColumns<T>()
+ } ?: ExportColumn.getAllLoadedColumns<T>()
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt
new file mode 100644
index 00000000..68b5a664
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltHostExportColumns.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltHostExportColumns
+ * ...
+ * ```
+ */
+public object DfltHostExportColumns {
+ public val TIMESTAMP: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOST_ID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { Binary.fromString(it.host.id) }
+
+ public val HOST_NAME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_name"),
+ ) { Binary.fromString(it.host.name) }
+
+ public val CPU_COUNT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("cpu_count"),
+ ) { it.host.cpuCount }
+
+ public val MEM_CAPACITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.host.memCapacity }
+
+ public val GUESTS_TERMINATED: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_terminated"),
+ ) { it.guestsTerminated }
+
+ public val GUESTS_RUNNING: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_running"),
+ ) { it.guestsRunning }
+
+ public val GUESTS_ERROR: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_error"),
+ ) { it.guestsError }
+
+ public val GUESTS_INVALID: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("guests_invalid"),
+ ) { it.guestsInvalid }
+
+ public val CPU_LIMIT: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_usage"),
+ ) { it.cpuUsage }
+
+ public val CPU_DEMAND: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_demand"),
+ ) { it.cpuDemand }
+
+ public val CPU_UTILIZATION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_utilization"),
+ ) { it.cpuUtilization }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val POWER_DRAW: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("power_draw"),
+ ) { it.powerDraw }
+
+ public val ENERGY_USAGE: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("energy_usage"),
+ ) { it.energyUsage }
+
+ public val CARBON_INTENSITY: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("carbon_intensity"),
+ ) { it.carbonIntensity }
+
+ public val CARBON_EMISSION: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("carbon_emission"),
+ ) { it.carbonEmission }
+
+ public val UP_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val BOOT_TIME: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<HostTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt
new file mode 100644
index 00000000..91d6c9bf
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServerExportColumns.kt
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltServerExportColumns
+ * ...
+ * ```
+ */
+public object DfltServerExportColumns {
+ public val TIMESTAMP: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val SERVER_ID: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_id"),
+ ) { Binary.fromString(it.server.id) }
+
+ public val HOST_ID: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.optional(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ ) { it.host?.id?.let { Binary.fromString(it) } }
+
+ public val SERVER_NAME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field =
+ Types.required(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_name"),
+ ) { Binary.fromString(it.server.name) }
+
+ public val CPU_COUNT: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("cpu_count"),
+ ) { it.server.cpuCount }
+
+ public val MEM_CAPACITY: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("mem_capacity"),
+ ) { it.server.memCapacity }
+
+ public val CPU_LIMIT: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(DOUBLE).named("cpu_limit"),
+ ) { it.cpuLimit }
+
+ public val CPU_TIME_ACTIVE: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_active"),
+ ) { it.cpuActiveTime }
+
+ public val CPU_TIME_IDLE: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_idle"),
+ ) { it.cpuIdleTime }
+
+ public val CPU_TIME_STEAL: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_steal"),
+ ) { it.cpuStealTime }
+
+ public val CPU_TIME_LOST: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("cpu_time_lost"),
+ ) { it.cpuLostTime }
+
+ public val UP_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("uptime"),
+ ) { it.uptime }
+
+ public val DOWN_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("downtime"),
+ ) { it.downtime }
+
+ public val PROVISION_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("provision_time"),
+ ) { it.provisionTime?.toEpochMilli() }
+
+ public val BOOT_TIME: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time"),
+ ) { it.bootTime?.toEpochMilli() }
+
+ public val BOOT_TIME_ABS: ExportColumn<ServerTableReader> =
+ ExportColumn(
+ field = Types.optional(INT64).named("boot_time_absolute"),
+ ) { it.bootTimeAbsolute?.toEpochMilli() }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt
new file mode 100644
index 00000000..89396545
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltServiceExportColumns.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+
+/**
+ * This object wraps the [ExportColumn]s to solves ambiguity for field
+ * names that are included in more than 1 exportable.
+ *
+ * Additionally, it allows to load all the fields at once by just its symbol,
+ * so that these columns can be deserialized. Additional fields can be added
+ * from anywhere, and they are deserializable as long as they are loaded by the jvm.
+ *
+ * ```kotlin
+ * ...
+ * // Loads the column
+ * DfltServiceExportColumns
+ * ...
+ * ```
+ */
+public object DfltServiceExportColumns {
+ public val TIMESTAMP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp"),
+ ) { it.timestamp.toEpochMilli() }
+
+ public val TIMESTAMP_ABS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT64).named("timestamp_absolute"),
+ ) { it.timestampAbsolute.toEpochMilli() }
+
+ public val HOSTS_UP: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("hosts_up"),
+ ) { it.hostsUp }
+
+ public val SERVERS_PENDING: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("servers_pending"),
+ ) { it.serversPending }
+
+ public val SERVERS_ACTIVE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("servers_active"),
+ ) { it.serversActive }
+
+ public val ATTEMPTS_SUCCESS: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_success"),
+ ) { it.attemptsSuccess }
+
+ public val AT3yyTEMPTS_FAILURE: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_failure"),
+ ) { it.attemptsFailure }
+
+ public val ATTEMPTS_ERROR: ExportColumn<ServiceTableReader> =
+ ExportColumn(
+ field = Types.required(INT32).named("attempts_error"),
+ ) { it.attemptsError }
+
+ /**
+ * The columns that are always included in the output file.
+ */
+ internal val BASE_EXPORT_COLUMNS =
+ setOf(
+ TIMESTAMP_ABS,
+ TIMESTAMP,
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
index 1c910497..6bea4cc2 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
@@ -26,45 +26,100 @@ import org.opendc.compute.telemetry.ComputeMonitor
import org.opendc.compute.telemetry.table.HostTableReader
import org.opendc.compute.telemetry.table.ServerTableReader
import org.opendc.compute.telemetry.table.ServiceTableReader
+import org.opendc.trace.util.parquet.exporter.ExportColumn
+import org.opendc.trace.util.parquet.exporter.Exportable
+import org.opendc.trace.util.parquet.exporter.Exporter
import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter =
- ParquetServerDataWriter(
- File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- private val hostWriter =
- ParquetHostDataWriter(
- File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- private val serviceWriter =
- ParquetServiceDataWriter(
- File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
- bufferSize,
- )
-
- override fun record(reader: ServerTableReader) {
- serverWriter.write(reader)
+public class ParquetComputeMonitor(
+ private val hostExporter: Exporter<HostTableReader>,
+ private val serverExporter: Exporter<ServerTableReader>,
+ private val serviceExporter: Exporter<ServiceTableReader>,
+) : ComputeMonitor, AutoCloseable {
+ override fun record(reader: HostTableReader) {
+ hostExporter.write(reader)
}
- override fun record(reader: HostTableReader) {
- hostWriter.write(reader)
+ override fun record(reader: ServerTableReader) {
+ serverExporter.write(reader)
}
override fun record(reader: ServiceTableReader) {
- serviceWriter.write(reader)
+ serviceExporter.write(reader)
}
override fun close() {
- hostWriter.close()
- serviceWriter.close()
- serverWriter.close()
+ hostExporter.close()
+ serverExporter.close()
+ serviceExporter.close()
+ }
+
+ public companion object {
+ /**
+ * Overloaded constructor with [ComputeExportConfig] as parameter.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ computeExportConfig: ComputeExportConfig,
+ ): ParquetComputeMonitor =
+ invoke(
+ base = base,
+ partition = partition,
+ bufferSize = bufferSize,
+ hostExportColumns = computeExportConfig.hostExportColumns,
+ serverExportColumns = computeExportConfig.serverExportColumns,
+ serviceExportColumns = computeExportConfig.serviceExportColumns,
+ )
+
+ /**
+ * Constructor that loads default [ExportColumn]s defined in
+ * [DfltHostExportColumns], [DfltServerExportColumns], [DfltServiceExportColumns]
+ * in case optional parameters are omitted and all fields need to be retrieved.
+ *
+ * @param[base] parent pathname for output file.
+ * @param[partition] child pathname for output file.
+ * @param[bufferSize] size of the buffer used by the writer thread.
+ */
+ public operator fun invoke(
+ base: File,
+ partition: String,
+ bufferSize: Int,
+ hostExportColumns: Collection<ExportColumn<HostTableReader>>? = null,
+ serverExportColumns: Collection<ExportColumn<ServerTableReader>>? = null,
+ serviceExportColumns: Collection<ExportColumn<ServiceTableReader>>? = null,
+ ): ParquetComputeMonitor {
+ // Loads the fields in case they need to be retrieved if optional params are omitted.
+ ComputeExportConfig.loadDfltColumns()
+
+ return ParquetComputeMonitor(
+ hostExporter =
+ Exporter(
+ outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
+ columns = hostExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ serverExporter =
+ Exporter(
+ outputFile = File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
+ columns = serverExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ serviceExporter =
+ Exporter(
+ outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
+ columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(),
+ bufferSize = bufferSize,
+ ),
+ )
+ }
}
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
deleted file mode 100644
index b96ee28b..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import mu.KotlinLogging
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A writer that writes data in Parquet format.
- *
- * @param path The path to the file to write the data to.
- * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
- */
-public abstract class ParquetDataWriter<in T>(
- path: File,
- private val writeSupport: WriteSupport<T>,
- bufferSize: Int = 4096,
-) : AutoCloseable {
- /**
- * The logging instance to use.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The queue of records to process.
- */
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
-
- /**
- * An exception to be propagated to the actual writer.
- */
- private var exception: Throwable? = null
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread =
- thread(start = false, name = this.toString()) {
- val writer =
- let {
- val builder =
- LocalParquetWriter.builder(path.toPath(), writeSupport)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- buildWriter(builder)
- }
-
- val queue = queue
- val buf = mutableListOf<T>()
- var shouldStop = false
-
- try {
- while (!shouldStop) {
- try {
- writer.write(queue.take())
- } catch (e: InterruptedException) {
- shouldStop = true
- }
-
- if (queue.drainTo(buf) > 0) {
- for (data in buf) {
- writer.write(data)
- }
- buf.clear()
- }
- }
- } catch (e: Throwable) {
- logger.error(e) { "Failure in Parquet data writer" }
- exception = e
- } finally {
- writer.close()
- }
- }
-
- /**
- * Build the [ParquetWriter] used to write the Parquet files.
- */
- protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
- return builder.build()
- }
-
- /**
- * Write the specified metrics to the database.
- */
- public fun write(data: T) {
- val exception = exception
- if (exception != null) {
- throw IllegalStateException("Writer thread failed", exception)
- }
-
- queue.put(data)
- }
-
- /**
- * Signal the writer to stop.
- */
- override fun close() {
- writerThread.interrupt()
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
deleted file mode 100644
index 020e67f2..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.HostTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-
-/**
- * A Parquet event writer for [HostTableReader]s.
- */
-public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
- return builder
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "host-writer"
-
- /**
- * A [WriteSupport] implementation for a [HostTableReader].
- */
- private class HostDataWriteSupport() : WriteSupport<HostTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: HostTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: HostTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("host_id", 2)
- consumer.addBinary(Binary.fromString(data.host.id))
- consumer.endField("host_id", 2)
-
- consumer.startField("host_name", 3)
- consumer.addBinary(Binary.fromString(data.host.name))
- consumer.endField("host_name", 3)
-
- consumer.startField("cpu_count", 4)
- consumer.addInteger(data.host.cpuCount)
- consumer.endField("cpu_count", 4)
-
- consumer.startField("mem_capacity", 5)
- consumer.addLong(data.host.memCapacity)
- consumer.endField("mem_capacity", 5)
-
- consumer.startField("guests_terminated", 6)
- consumer.addInteger(data.guestsTerminated)
- consumer.endField("guests_terminated", 6)
-
- consumer.startField("guests_running", 7)
- consumer.addInteger(data.guestsRunning)
- consumer.endField("guests_running", 7)
-
- consumer.startField("guests_error", 8)
- consumer.addInteger(data.guestsError)
- consumer.endField("guests_error", 8)
-
- consumer.startField("guests_invalid", 9)
- consumer.addInteger(data.guestsInvalid)
- consumer.endField("guests_invalid", 9)
-
- consumer.startField("cpu_limit", 10)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 10)
-
- consumer.startField("cpu_usage", 11)
- consumer.addDouble(data.cpuUsage)
- consumer.endField("cpu_usage", 11)
-
- consumer.startField("cpu_demand", 12)
- consumer.addDouble(data.cpuUsage)
- consumer.endField("cpu_demand", 12)
-
- consumer.startField("cpu_utilization", 13)
- consumer.addDouble(data.cpuUtilization)
- consumer.endField("cpu_utilization", 13)
-
- consumer.startField("cpu_time_active", 14)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 14)
-
- consumer.startField("cpu_time_idle", 15)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 15)
-
- consumer.startField("cpu_time_steal", 16)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 16)
-
- consumer.startField("cpu_time_lost", 17)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 17)
-
- consumer.startField("power_draw", 18)
- consumer.addDouble(data.powerDraw)
- consumer.endField("power_draw", 18)
-
- consumer.startField("energy_usage", 19)
- consumer.addDouble(data.energyUsage)
- consumer.endField("energy_usage", 19)
-
- consumer.startField("carbon_intensity", 20)
- consumer.addDouble(data.carbonIntensity)
- consumer.endField("carbon_intensity", 20)
-
- consumer.startField("carbon_emission", 21)
- consumer.addDouble(data.carbonEmission)
- consumer.endField("carbon_emission", 21)
-
- consumer.startField("uptime", 22)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 22)
-
- consumer.startField("downtime", 23)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 23)
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 24)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 24)
- }
-
- val bootTimeAbsolute = data.bootTimeAbsolute
- if (bootTimeAbsolute != null) {
- consumer.startField("boot_time_absolute", 25)
- consumer.addLong(bootTimeAbsolute.toEpochMilli())
- consumer.endField("boot_time_absolute", 25)
- }
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the host data.
- */
- val SCHEMA: MessageType =
- Types
- .buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_name"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_terminated"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_running"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_error"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_invalid"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_demand"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_utilization"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("power_draw"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("energy_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("carbon_intensity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("carbon_emission"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time_absolute"),
- )
- .named("host")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
deleted file mode 100644
index e1b489ac..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.Binary
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.LogicalTypeAnnotation
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.ServerTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-
-/**
- * A Parquet event writer for [ServerTableReader]s.
- */
-public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
- return builder
- .withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "server-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServerTableReader].
- */
- private class ServerDataWriteSupport() : WriteSupport<ServerTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServerTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: ServerTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("server_id", 2)
- consumer.addBinary(Binary.fromString(data.server.id))
- consumer.endField("server_id", 2)
-
- consumer.startField("server_name", 3)
- consumer.addBinary(Binary.fromString(data.server.name))
- consumer.endField("server_name", 3)
-
- val hostId = data.host?.id
- if (hostId != null) {
- consumer.startField("host_id", 4)
- consumer.addBinary(Binary.fromString(hostId))
- consumer.endField("host_id", 4)
- }
-
- consumer.startField("mem_capacity", 5)
- consumer.addLong(data.server.memCapacity)
- consumer.endField("mem_capacity", 5)
-
- consumer.startField("cpu_count", 6)
- consumer.addInteger(data.server.cpuCount)
- consumer.endField("cpu_count", 6)
-
- consumer.startField("cpu_limit", 7)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 7)
-
- consumer.startField("cpu_time_active", 8)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 8)
-
- consumer.startField("cpu_time_idle", 9)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 9)
-
- consumer.startField("cpu_time_steal", 10)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 10)
-
- consumer.startField("cpu_time_lost", 11)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 11)
-
- consumer.startField("uptime", 12)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 12)
-
- consumer.startField("downtime", 13)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 13)
-
- val provisionTime = data.provisionTime
- if (provisionTime != null) {
- consumer.startField("provision_time", 14)
- consumer.addLong(provisionTime.toEpochMilli())
- consumer.endField("provision_time", 14)
- }
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 15)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 15)
- }
-
- val bootTimeAbsolute = data.bootTimeAbsolute
- if (bootTimeAbsolute != null) {
- consumer.startField("boot_time_absolute", 16)
- consumer.addLong(bootTimeAbsolute.toEpochMilli())
- consumer.endField("boot_time_absolute", 16)
- }
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the server data.
- */
- val SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_name"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("provision_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("boot_time_absolute"),
- )
- .named("server")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
deleted file mode 100644
index eba8fc4f..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.schema.PrimitiveType
-import org.apache.parquet.schema.Types
-import org.opendc.compute.telemetry.table.ServiceTableReader
-import java.io.File
-
-/**
- * A Parquet event writer for [ServiceTableReader]s.
- */
-public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
- override fun toString(): String = "service-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServiceTableReader].
- */
- private class ServiceDataWriteSupport() : WriteSupport<ServiceTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServiceTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(
- consumer: RecordConsumer,
- data: ServiceTableReader,
- ) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("timestamp_absolute", 1)
- consumer.addLong(data.timestampAbsolute.toEpochMilli())
- consumer.endField("timestamp_absolute", 1)
-
- consumer.startField("hosts_up", 2)
- consumer.addInteger(data.hostsUp)
- consumer.endField("hosts_up", 2)
-
- consumer.startField("hosts_down", 3)
- consumer.addInteger(data.hostsDown)
- consumer.endField("hosts_down", 3)
-
- consumer.startField("servers_pending", 4)
- consumer.addInteger(data.serversPending)
- consumer.endField("servers_pending", 4)
-
- consumer.startField("servers_active", 5)
- consumer.addInteger(data.serversActive)
- consumer.endField("servers_active", 5)
-
- consumer.startField("attempts_success", 6)
- consumer.addInteger(data.attemptsSuccess)
- consumer.endField("attempts_pending", 6)
-
- consumer.startField("attempts_failure", 7)
- consumer.addInteger(data.attemptsFailure)
- consumer.endField("attempts_failure", 7)
-
- consumer.startField("attempts_error", 8)
- consumer.addInteger(data.attemptsError)
- consumer.endField("attempts_error", 8)
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- private val SCHEMA: MessageType =
- Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("timestamp_absolute"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_up"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_down"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_pending"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_success"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_failure"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_error"),
- )
- .named("service")
- }
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md
new file mode 100644
index 00000000..f48bc229
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/README.md
@@ -0,0 +1,70 @@
+### Summary
+Added output configuration, that can be defined in the scenario `.json` file, that allows to select which columns are to be included in the raw oputput files `host.parquet`, `server.parquet` and `service.parquet`.
+
+### Columns
+The 'default' columns are defined in `DfltHostExportcolumns`, `DfltServerExportColumns` and `DfltServiceExportColumns`. Any number of additional columns can be definied anywhere (`ExportColumn<Exportable>`) and it is going to be deserializable as long as it is loaded by the jvm.
+
+### Deserialization
+Each `ExportColumn` has a `Regex`, used for deserialization. If no custom regex is provided, the default one is used. The default regex matches the column name in case-insensitive manner, either with `_` as in the name or with ` ` (blank space).
+
+###### E.g.:
+***column name*** = "cpu\_count"
+***default column regex*** = "\\s*(?:cpu_count|cpu count)\\s*" (case insensitive)
+***matches*** = "cpu\_count", "cpu count", "CpU/_cOuNt" etc.
+
+### JSON Schema
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "type": "object",
+ "properties": {
+ "hostExportColumns": { "type": "array" },
+ "serverExportColumns": { "type": "array" } ,
+ "serviceExportColumns": { "type": "array" } ,
+ "required": [ /* NONE REQUIRED */ ]
+ }
+ },
+ ...
+ "required": [
+ ...
+ // NOT REQUIRED
+ ]
+}
+```
+
+&nbsp;
+###### Bad Formatting Cases
+- If a column name (and type) does not match any deserializable column, the entry is ignored and error message is logged.
+- If an empty list of columns is provided or those that are provided were not deserializable, then all loaded columns for that `Exportable` are used, and a warning message is logged.
+- If no list is provided, then all loaded columns for that `Exportable` are used.
+
+
+### Example
+
+```json
+// scenario.json
+{
+ ...
+ "computeExportConfig": {
+ "hostExportColumns": ["timestamp", "timestamp_absolute", "invalid-entry1", "guests_invalid"],
+ "serverExportColumns": ["invalid-entry2"],
+ "serviceExportColumns": ["timestamp", "servers_active", "servers_pending"]
+ },
+ ...
+```
+
+```json
+// console output
+10:51:56.561 [ERROR] ColListSerializer - no match found for column "invalid-entry1", ignoring...
+10:51:56.563 [ERROR] ColListSerializer - no match found for column "invalid-entry2", ignoring...
+10:51:56.564 [WARN] ComputeExportConfig - deserialized list of export columns for exportable ServerTableReader produced empty list, falling back to all loaded columns
+10:51:56.584 [INFO] ScenariosSpec -
+| === Compute Export Config ===
+| Host columns : timestamp, timestamp_absolute, guests_invalid
+| Server columns : timestamp, timestamp_absolute, server_id, server_name, cpu_count, mem_capacity, cpu_limit, cpu_time_active, cpu_time_idle, cpu_time_steal, cpu_time_lost, uptime, downtime, provision_time, boot_time, boot_time_absolute
+| Service columns : timestamp, servers_active, servers_pending
+
+```
+
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt
deleted file mode 100644
index a2e82df1..00000000
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.telemetry.export.parquet
-
-import org.apache.parquet.io.api.Binary
-import java.nio.ByteBuffer
-import java.util.UUID
-
-/**
- * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet.
- */
-internal fun UUID.toBinary(): Binary {
- val bb = ByteBuffer.allocate(16)
- bb.putLong(mostSignificantBits)
- bb.putLong(leastSignificantBits)
- bb.rewind()
- return Binary.fromConstantByteBuffer(bb)
-}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
index d41c6dc0..a7b8bedb 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
@@ -22,12 +22,13 @@
package org.opendc.compute.telemetry.table
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a host trace entry.
*/
-public interface HostTableReader {
+public interface HostTableReader : Exportable {
public fun copy(): HostTableReader
public fun setValues(table: HostTableReader)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
index 51113025..a1aed778 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.telemetry.table
+import org.opendc.compute.telemetry.export.parquet.DfltServerExportColumns
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a server trace entry.
*/
-public interface ServerTableReader {
+public interface ServerTableReader : Exportable {
public fun copy(): ServerTableReader
public fun setValues(table: ServerTableReader)
@@ -102,3 +104,6 @@ public interface ServerTableReader {
*/
public val cpuLostTime: Long
}
+
+// Loads the default export fields for deserialization whenever this file is loaded.
+private val _ignore = DfltServerExportColumns
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
index e6c2a1ae..c3a92fc7 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
@@ -22,12 +22,13 @@
package org.opendc.compute.telemetry.table
+import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
/**
* An interface that is used to read a row of a service trace entry.
*/
-public interface ServiceTableReader {
+public interface ServiceTableReader : Exportable {
public fun copy(): ServiceTableReader
public fun setValues(table: ServiceTableReader)