summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-telemetry
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry')
-rw-r--r--opendc-compute/opendc-compute-telemetry/build.gradle.kts40
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt508
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt47
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt67
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt132
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt233
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt208
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt131
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt28
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt130
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt37
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt95
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt57
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt80
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml38
16 files changed, 1869 insertions, 0 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
new file mode 100644
index 00000000..c403ccb9
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "OpenDC Compute Service implementation"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(projects.opendcCompute.opendcComputeApi)
+ implementation(projects.opendcCommon)
+ implementation(libs.kotlin.logging)
+ implementation(project(mapOf("path" to ":opendc-trace:opendc-trace-parquet")))
+ implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-service")))
+
+ testImplementation(projects.opendcSimulator.opendcSimulatorCore)
+ testRuntimeOnly(libs.log4j.core)
+ testRuntimeOnly(libs.log4j.slf4j)
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
new file mode 100644
index 00000000..3a1fed1f
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
@@ -0,0 +1,508 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.telemetry.table.HostInfo
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.compute.telemetry.table.ServerInfo
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
+ * export interval.
+ *
+ * @param dispatcher A [Dispatcher] for scheduling the future events.
+ * @param service The [ComputeService] to monitor.
+ * @param monitor The monitor to export the metrics to.
+ * @param exportInterval The export interval.
+ */
+public class ComputeMetricReader(
+ dispatcher: Dispatcher,
+ private val service: ComputeService,
+ private val monitor: ComputeMonitor,
+ private val exportInterval: Duration = Duration.ofMinutes(5)
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
+ private val clock = dispatcher.timeSource
+
+ /**
+ * Aggregator for service metrics.
+ */
+ private val serviceTableReader = ServiceTableReaderImpl(service)
+
+ /**
+ * Mapping from [Host] instances to [HostTableReaderImpl]
+ */
+ private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>()
+
+ /**
+ * Mapping from [Server] instances to [ServerTableReaderImpl]
+ */
+ private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>()
+
+ /**
+ * The background job that is responsible for collecting the metrics every cycle.
+ */
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ loggState()
+ }
+ } finally {
+ loggState()
+
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
+ }
+ }
+
+ private fun loggState() {
+ try {
+ val now = this.clock.instant()
+
+ for (host in this.service.hosts) {
+ val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ for (server in this.service.servers) {
+ val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ this.monitor.record(reader.copy())
+ reader.reset()
+ }
+
+ this.serviceTableReader.record(now)
+ monitor.record(this.serviceTableReader.copy())
+ } catch (cause: Throwable) {
+ this.logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+
+ override fun close() {
+ job.cancel()
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
+
+ override fun copy(): ServiceTableReader {
+ val newServiceTable = ServiceTableReaderImpl(service)
+ newServiceTable.setValues(this)
+
+ return newServiceTable
+ }
+
+ override fun setValues(table: ServiceTableReader) {
+ _timestamp = table.timestamp
+
+ _hostsUp = table.hostsUp
+ _hostsDown = table.hostsDown
+ _serversTotal = table.serversTotal
+ _serversPending = table.serversPending
+ _serversActive = table.serversActive
+ _attemptsSuccess = table.attemptsSuccess
+ _attemptsFailure = table.attemptsFailure
+ _attemptsError = table.attemptsError
+ }
+
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val serversTotal: Int
+ get() = _serversTotal
+ private var _serversTotal = 0
+
+ override val serversPending: Int
+ get() = _serversPending
+ private var _serversPending = 0
+
+ override val serversActive: Int
+ get() = _serversActive
+ private var _serversActive = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ private var _attemptsError = 0
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ _timestamp = now
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _serversTotal = stats.serversTotal
+ _serversPending = stats.serversPending
+ _serversActive = stats.serversActive
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ _attemptsError = stats.attemptsError.toInt()
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostTableReaderImpl(host: Host) : HostTableReader {
+ override fun copy(): HostTableReader {
+ val newHostTable = HostTableReaderImpl(_host)
+ newHostTable.setValues(this)
+
+ return newHostTable
+ }
+
+ override fun setValues(table: HostTableReader) {
+ _timestamp = table.timestamp
+ _guestsTerminated = table.guestsTerminated
+ _guestsRunning = table.guestsRunning
+ _guestsError = table.guestsError
+ _guestsInvalid = table.guestsInvalid
+ _cpuLimit = table.cpuLimit
+ _cpuDemand = table.cpuDemand
+ _cpuUsage = table.cpuUsage
+ _cpuUtilization = table.cpuUtilization
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _powerUsage = table.powerUsage
+ _powerTotal = table.powerTotal
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _bootTime = table.bootTime
+ }
+
+ private val _host = host
+
+ override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ private var _powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerUsage = hostSysStats.powerUsage
+ _powerTotal = hostSysStats.energyUsage
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
+ override fun copy(): ServerTableReader {
+ val newServerTable = ServerTableReaderImpl(service, _server)
+ newServerTable.setValues(this)
+
+ return newServerTable
+ }
+
+ override fun setValues(table: ServerTableReader) {
+ host = table.host
+
+ _timestamp = table.timestamp
+ _cpuLimit = table.cpuLimit
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _provisionTime = table.provisionTime
+ _bootTime = table.bootTime
+ }
+
+ private val _server = server
+
+ /**
+ * The static information about this server.
+ */
+ override val server = ServerInfo(
+ server.uid.toString(),
+ server.name,
+ "vm",
+ "x86",
+ server.image.uid.toString(),
+ server.image.name,
+ server.flavor.cpuCount,
+ server.flavor.memorySize
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: Host? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val newHost = service.lookupHost(_server)
+ if (newHost != null && newHost.uid != _host?.uid) {
+ _host = newHost
+ host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
+ }
+
+ val cpuStats = _host?.getCpuStats(_server)
+ val sysStats = _host?.getSystemStats(_server)
+
+ _timestamp = now
+ _cpuLimit = cpuStats?.capacity ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = _server.launchedAt
+ _bootTime = sysStats?.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt
new file mode 100644
index 00000000..b236a7df
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMonitor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry
+
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.compute.telemetry.table.ServiceTableReader
+
+/**
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
+ */
+public interface ComputeMonitor {
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: ServerTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: HostTableReader) {}
+
+ /**
+ * Record an entry with the specified [reader].
+ */
+ public fun record(reader: ServiceTableReader) {}
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
new file mode 100644
index 00000000..2a4f27d4
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.opendc.compute.telemetry.ComputeMonitor
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import java.io.File
+
+/**
+ * A [ComputeMonitor] that logs the events to a Parquet file.
+ */
+public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val hostWriter = ParquetHostDataWriter(
+ File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ private val serviceWriter = ParquetServiceDataWriter(
+ File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
+ }
+
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
+ }
+
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
+ }
+
+ override fun close() {
+ hostWriter.close()
+ serviceWriter.close()
+ serverWriter.close()
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
new file mode 100644
index 00000000..34a75d75
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import mu.KotlinLogging
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
+ */
+public abstract class ParquetDataWriter<in T>(
+ path: File,
+ private val writeSupport: WriteSupport<T>,
+ bufferSize: Int = 4096
+) : AutoCloseable {
+ /**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The queue of records to process.
+ */
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * An exception to be propagated to the actual writer.
+ */
+ private var exception: Throwable? = null
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ writer.write(queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ writer.write(data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
+ return builder.build()
+ }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ override fun close() {
+ writerThread.interrupt()
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
new file mode 100644
index 00000000..4cf9c6f1
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.HostTableReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+
+/**
+ * A Parquet event writer for [HostTableReader]s.
+ */
+public class ParquetHostDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
+
+ override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun toString(): String = "host-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [HostTableReader].
+ */
+ private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: HostTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("host_id", 1)
+ consumer.addBinary(Binary.fromString(data.host.id))
+ consumer.endField("host_id", 1)
+
+ consumer.startField("cpu_count", 2)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 2)
+
+ consumer.startField("mem_capacity", 3)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_capacity", 3)
+
+ consumer.startField("guests_terminated", 4)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 4)
+
+ consumer.startField("guests_running", 5)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 5)
+
+ consumer.startField("guests_error", 6)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 6)
+
+ consumer.startField("guests_invalid", 7)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 7)
+
+ consumer.startField("cpu_limit", 8)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_usage", 9)
+ consumer.addDouble(data.cpuUsage)
+ consumer.endField("cpu_usage", 9)
+
+ consumer.startField("cpu_demand", 10)
+ consumer.addDouble(data.cpuUsage)
+ consumer.endField("cpu_demand", 10)
+
+ consumer.startField("cpu_utilization", 11)
+ consumer.addDouble(data.cpuUtilization)
+ consumer.endField("cpu_utilization", 11)
+
+ consumer.startField("cpu_time_active", 12)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 12)
+
+ consumer.startField("cpu_time_idle", 13)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 13)
+
+ consumer.startField("cpu_time_steal", 14)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 14)
+
+ consumer.startField("cpu_time_lost", 15)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 15)
+
+ consumer.startField("power_total", 16)
+ consumer.addDouble(data.powerTotal)
+ consumer.endField("power_total", 16)
+
+ consumer.startField("uptime", 17)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 17)
+
+ consumer.startField("downtime", 18)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 18)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 19)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 19)
+ }
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ /**
+ * The schema of the host data.
+ */
+ val SCHEMA: MessageType = Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_demand"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_utilization"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_total"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time")
+ )
+ .named("host")
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..6645028e
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServerTableReader
+import org.opendc.trace.util.parquet.LocalParquetWriter
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServerTableReader]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
+
+ override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
+
+ override fun toString(): String = "server-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [ServerTableReader].
+ */
+ private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServerTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("server_id", 1)
+ consumer.addBinary(Binary.fromString(data.server.id))
+ consumer.endField("server_id", 1)
+
+ consumer.startField("server_name", 2)
+ consumer.addBinary(Binary.fromString(data.server.name))
+ consumer.endField("server_name", 2)
+
+ val hostId = data.host?.id
+ if (hostId != null) {
+ consumer.startField("host_id", 3)
+ consumer.addBinary(Binary.fromString(hostId))
+ consumer.endField("host_id", 3)
+ }
+
+ consumer.startField("mem_capacity", 4)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_capacity", 4)
+
+ consumer.startField("cpu_count", 5)
+ consumer.addInteger(data.server.cpuCount)
+ consumer.endField("cpu_count", 5)
+
+ consumer.startField("cpu_limit", 6)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 6)
+
+ consumer.startField("cpu_time_active", 7)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 7)
+
+ consumer.startField("cpu_time_idle", 8)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 8)
+
+ consumer.startField("cpu_time_steal", 9)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 9)
+
+ consumer.startField("cpu_time_lost", 10)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 10)
+
+ consumer.startField("uptime", 11)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 11)
+
+ consumer.startField("downtime", 12)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 12)
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 13)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 13)
+ }
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 14)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 14)
+ }
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ /**
+ * The schema of the server data.
+ */
+ val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_name"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time")
+
+ )
+ .named("server")
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
new file mode 100644
index 00000000..6908a018
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Types
+import org.opendc.compute.telemetry.table.ServiceTableReader
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServiceTableReader]s.
+ */
+public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
+
+ override fun toString(): String = "service-writer"
+
+ /**
+ * A [WriteSupport] implementation for a [ServiceTableReader].
+ */
+ private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServiceTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("hosts_up", 1)
+ consumer.addInteger(data.hostsUp)
+ consumer.endField("hosts_up", 1)
+
+ consumer.startField("hosts_down", 2)
+ consumer.addInteger(data.hostsDown)
+ consumer.endField("hosts_down", 2)
+
+ consumer.startField("servers_pending", 3)
+ consumer.addInteger(data.serversPending)
+ consumer.endField("servers_pending", 3)
+
+ consumer.startField("servers_active", 4)
+ consumer.addInteger(data.serversActive)
+ consumer.endField("servers_active", 4)
+
+ consumer.startField("attempts_success", 5)
+ consumer.addInteger(data.attemptsSuccess)
+ consumer.endField("attempts_pending", 5)
+
+ consumer.startField("attempts_failure", 6)
+ consumer.addInteger(data.attemptsFailure)
+ consumer.endField("attempts_failure", 6)
+
+ consumer.startField("attempts_error", 7)
+ consumer.addInteger(data.attemptsError)
+ consumer.endField("attempts_error", 7)
+
+ consumer.endMessage()
+ }
+ }
+
+ private companion object {
+ private val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error")
+ )
+ .named("service")
+ }
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt
new file mode 100644
index 00000000..a2e82df1
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/Utils.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import java.nio.ByteBuffer
+import java.util.UUID
+
+/**
+ * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet.
+ */
+internal fun UUID.toBinary(): Binary {
+ val bb = ByteBuffer.allocate(16)
+ bb.putLong(mostSignificantBits)
+ bb.putLong(leastSignificantBits)
+ bb.rewind()
+ return Binary.fromConstantByteBuffer(bb)
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt
new file mode 100644
index 00000000..58b7853d
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostInfo.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+/**
+ * Information about a host exposed to the telemetry service.
+ */
+public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
new file mode 100644
index 00000000..3761b4b3
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader {
+
+ public fun copy(): HostTableReader
+
+ public fun setValues(table: HostTableReader)
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power usage of the host in W.
+ */
+ public val powerUsage: Double
+
+ /**
+ * The total power consumption of the host since last time in J.
+ */
+ public val powerTotal: Double
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTime: Instant?
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt
new file mode 100644
index 00000000..96c5bb13
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+/**
+ * Static information about a server exposed to the telemetry service.
+ */
+public data class ServerInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val imageId: String,
+ val imageName: String,
+ val cpuCount: Int,
+ val memCapacity: Long
+)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
new file mode 100644
index 00000000..1d1ecd10
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a server trace entry.
+ */
+public interface ServerTableReader {
+
+ public fun copy(): ServerTableReader
+
+ public fun setValues(table: ServerTableReader)
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [ServerInfo] of the server to which the row belongs to.
+ */
+ public val server: ServerInfo
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the server was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the server booted.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The capacity of the CPUs of the servers (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the server.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the server.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt
new file mode 100644
index 00000000..0d8b2abd
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for the compute service.
+ */
+public data class ServiceData(
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversTotal: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
+)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(
+ timestamp,
+ hostsUp,
+ hostsDown,
+ serversTotal,
+ serversPending,
+ serversActive,
+ attemptsSuccess,
+ attemptsFailure,
+ attemptsError
+ )
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
new file mode 100644
index 00000000..90b0c6ea
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.telemetry.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a service trace entry.
+ */
+public interface ServiceTableReader {
+
+ public fun copy(): ServiceTableReader
+
+ public fun setValues(table: ServiceTableReader)
+
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of servers that are registered with the compute service..
+ */
+ public val serversTotal: Int
+
+ /**
+ * The number of servers that are pending to be scheduled.
+ */
+ public val serversPending: Int
+
+ /**
+ * The number of servers that are currently active.
+ */
+ public val serversActive: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to scheduler error.
+ */
+ public val attemptsError: Int
+}
diff --git a/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml b/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..0dfb75f2
--- /dev/null
+++ b/opendc-compute/opendc-compute-telemetry/src/test/resources/log4j2.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 AtLarge Research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="trace" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>