diff options
4 files changed, 63 insertions, 16 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt new file mode 100644 index 00000000..a4676f31 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +internal fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 36207045..58388cb1 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -47,8 +47,6 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id - builder["num_cpus"] = data.host.cpuCount - builder["mem_capacity"] = data.host.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -57,12 +55,15 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["boot_time"] = bootTime.toEpochMilli() } + builder["cpu_count"] = data.host.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + builder["mem_limit"] = data.host.memCapacity + builder["power_total"] = data.powerTotal builder["guests_terminated"] = data.guestsTerminated @@ -78,18 +79,18 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("host_id") - .requiredInt("num_cpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .requiredDouble("power_total") .requiredInt("guests_terminated") .requiredInt("guests_running") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt index c5a5e7c0..43b5f469 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -30,6 +30,7 @@ import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerData import java.io.File +import java.util.* /** * A Parquet event writer for [ServerData]s. @@ -49,8 +50,6 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["server_id"] = data.server.id builder["host_id"] = data.host?.id - builder["num_vcpus"] = data.server.cpuCount - builder["mem_capacity"] = data.server.memCapacity builder["uptime"] = data.uptime builder["downtime"] = data.downtime @@ -60,11 +59,14 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : } builder["scheduling_latency"] = data.schedulingLatency + builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit builder["cpu_time_active"] = data.cpuActiveTime builder["cpu_time_idle"] = data.cpuIdleTime builder["cpu_time_steal"] = data.cpuStealTime builder["cpu_time_lost"] = data.cpuLostTime + + builder["mem_limit"] = data.server.memCapacity } override fun toString(): String = "server-writer" @@ -74,20 +76,20 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .record("server") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") - .requiredString("server_id") - .optionalString("host_id") - .requiredInt("num_vcpus") - .requiredLong("mem_capacity") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() + .name("server_id").type(UUID_SCHEMA).noDefault() + .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") - .optionalLong("boot_time") + .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .requiredLong("scheduling_latency") + .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") .requiredLong("cpu_time_idle") .requiredLong("cpu_time_steal") .requiredLong("cpu_time_lost") + .requiredLong("mem_limit") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index d9ca55cb..2928f445 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -52,7 +52,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .requiredLong("timestamp") + .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() .requiredInt("hosts_up") .requiredInt("hosts_down") .requiredInt("servers_pending") |
