summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt44
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt15
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt2
4 files changed, 63 insertions, 16 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
new file mode 100644
index 00000000..a4676f31
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/AvroUtils.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("AvroUtils")
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.LogicalTypes
+import org.apache.avro.Schema
+
+/**
+ * Schema for UUID type.
+ */
+internal val UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+
+/**
+ * Schema for timestamp type.
+ */
+internal val TIMESTAMP_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+
+/**
+ * Helper function to make a [Schema] field optional.
+ */
+internal fun Schema.optional(): Schema {
+ return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 36207045..58388cb1 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -47,8 +47,6 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["host_id"] = data.host.id
- builder["num_cpus"] = data.host.cpuCount
- builder["mem_capacity"] = data.host.memCapacity
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
@@ -57,12 +55,15 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
builder["boot_time"] = bootTime.toEpochMilli()
}
+ builder["cpu_count"] = data.host.cpuCount
builder["cpu_limit"] = data.cpuLimit
builder["cpu_time_active"] = data.cpuActiveTime
builder["cpu_time_idle"] = data.cpuIdleTime
builder["cpu_time_steal"] = data.cpuStealTime
builder["cpu_time_lost"] = data.cpuLostTime
+ builder["mem_limit"] = data.host.memCapacity
+
builder["power_total"] = data.powerTotal
builder["guests_terminated"] = data.guestsTerminated
@@ -78,18 +79,18 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.record("host")
.namespace("org.opendc.telemetry.compute")
.fields()
- .requiredLong("timestamp")
- .requiredString("host_id")
- .requiredInt("num_cpus")
- .requiredLong("mem_capacity")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA).noDefault()
.requiredLong("uptime")
.requiredLong("downtime")
- .optionalLong("boot_time")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
+ .requiredInt("cpu_count")
.requiredDouble("cpu_limit")
.requiredLong("cpu_time_active")
.requiredLong("cpu_time_idle")
.requiredLong("cpu_time_steal")
.requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
.requiredDouble("power_total")
.requiredInt("guests_terminated")
.requiredInt("guests_running")
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
index c5a5e7c0..43b5f469 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -30,6 +30,7 @@ import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.ServerData
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [ServerData]s.
@@ -49,8 +50,6 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
builder["server_id"] = data.server.id
builder["host_id"] = data.host?.id
- builder["num_vcpus"] = data.server.cpuCount
- builder["mem_capacity"] = data.server.memCapacity
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
@@ -60,11 +59,14 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
}
builder["scheduling_latency"] = data.schedulingLatency
+ builder["cpu_count"] = data.server.cpuCount
builder["cpu_limit"] = data.cpuLimit
builder["cpu_time_active"] = data.cpuActiveTime
builder["cpu_time_idle"] = data.cpuIdleTime
builder["cpu_time_steal"] = data.cpuStealTime
builder["cpu_time_lost"] = data.cpuLostTime
+
+ builder["mem_limit"] = data.server.memCapacity
}
override fun toString(): String = "server-writer"
@@ -74,20 +76,20 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.record("server")
.namespace("org.opendc.telemetry.compute")
.fields()
- .requiredLong("timestamp")
- .requiredString("server_id")
- .optionalString("host_id")
- .requiredInt("num_vcpus")
- .requiredLong("mem_capacity")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
+ .name("server_id").type(UUID_SCHEMA).noDefault()
+ .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
.requiredLong("uptime")
.requiredLong("downtime")
- .optionalLong("boot_time")
+ .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
.requiredLong("scheduling_latency")
+ .requiredInt("cpu_count")
.requiredDouble("cpu_limit")
.requiredLong("cpu_time_active")
.requiredLong("cpu_time_idle")
.requiredLong("cpu_time_steal")
.requiredLong("cpu_time_lost")
+ .requiredLong("mem_limit")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index d9ca55cb..2928f445 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -52,7 +52,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
.record("service")
.namespace("org.opendc.telemetry.compute")
.fields()
- .requiredLong("timestamp")
+ .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
.requiredInt("hosts_up")
.requiredInt("hosts_down")
.requiredInt("servers_pending")