From 059a09949e839fe9e906c398710b5235b452b0e0 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 10 Nov 2023 22:30:49 +0100 Subject: added greenifier demo, fixed HostTableReader --- .../org/opendc/experiments/compute/TraceHelpers.kt | 2 +- .../export/parquet/ParquetHostDataWriter.kt | 147 ++++++++++++--------- .../export/parquet/ParquetServerDataWriter.kt | 99 +++++++------- .../compute/telemetry/ComputeMetricReader.kt | 52 +++++++- .../compute/telemetry/table/HostTableReader.kt | 13 +- .../compute/telemetry/table/ServerTableReader.kt | 4 + 6 files changed, 199 insertions(+), 118 deletions(-) (limited to 'opendc-experiments/opendc-experiments-compute') diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt index 16d28edb..5d4c88cd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt @@ -104,7 +104,7 @@ public suspend fun ComputeService.replay( // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() - delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000)) // Stop the server after reaching the end-time of the virtual machine server.stop() diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt index 979aebeb..75e792d0 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt @@ -78,68 +78,80 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : consumer.addBinary(UUID.fromString(data.host.id).toBinary()) consumer.endField("host_id", 1) - consumer.startField("uptime", 2) - consumer.addLong(data.uptime) - consumer.endField("uptime", 2) + consumer.startField("cpu_count", 2) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 2) - consumer.startField("downtime", 3) - consumer.addLong(data.downtime) - consumer.endField("downtime", 3) + consumer.startField("mem_capacity", 3) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_capacity", 3) - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 4) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 4) - } + consumer.startField("guests_terminated", 4) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 4) - consumer.startField("cpu_count", 5) - consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 5) + consumer.startField("guests_running", 5) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 5) - consumer.startField("cpu_limit", 6) + consumer.startField("guests_error", 6) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 6) + + consumer.startField("guests_invalid", 7) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 7) + + consumer.startField("cpu_limit", 8) consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 6) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_usage", 9) + consumer.addDouble(data.cpuUsage) + consumer.endField("cpu_usage", 9) + + consumer.startField("cpu_demand", 10) + consumer.addDouble(data.cpuUsage) + consumer.endField("cpu_demand", 10) + + consumer.startField("cpu_utilization", 11) + consumer.addDouble(data.cpuUtilization) + consumer.endField("cpu_utilization", 11) - consumer.startField("cpu_time_active", 7) + consumer.startField("cpu_time_active", 12) consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 7) + consumer.endField("cpu_time_active", 12) - consumer.startField("cpu_time_idle", 8) + consumer.startField("cpu_time_idle", 13) consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 8) + consumer.endField("cpu_time_idle", 13) - consumer.startField("cpu_time_steal", 9) + consumer.startField("cpu_time_steal", 14) consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 9) + consumer.endField("cpu_time_steal", 14) - consumer.startField("cpu_time_lost", 10) + consumer.startField("cpu_time_lost", 15) consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 10) + consumer.endField("cpu_time_lost", 15) - consumer.startField("mem_limit", 11) - consumer.addLong(data.host.memCapacity) - consumer.endField("mem_limit", 11) - - consumer.startField("power_total", 12) + consumer.startField("power_total", 16) consumer.addDouble(data.powerTotal) - consumer.endField("power_total", 12) + consumer.endField("power_total", 16) - consumer.startField("guests_terminated", 13) - consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 13) - - consumer.startField("guests_running", 14) - consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 14) + consumer.startField("uptime", 17) + consumer.addLong(data.uptime) + consumer.endField("uptime", 17) - consumer.startField("guests_error", 15) - consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 15) + consumer.startField("downtime", 18) + consumer.addLong(data.downtime) + consumer.endField("downtime", 18) - consumer.startField("guests_invalid", 16) - consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 16) + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 19) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 19) + } consumer.endMessage() } @@ -162,21 +174,35 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .`as`(LogicalTypeAnnotation.uuidType()) .named("host_id"), Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), + .named("mem_capacity"), Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time"), + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_demand"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_utilization"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("cpu_time_active"), @@ -189,24 +215,19 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : Types .required(PrimitiveType.PrimitiveTypeName.INT64) .named("cpu_time_lost"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_limit"), Types .required(PrimitiveType.PrimitiveTypeName.DOUBLE) .named("power_total"), Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_terminated"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_running"), + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_error"), + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("guests_invalid") + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time") ) .named("host") } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt index 99ef83c6..ed5bb64f 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt @@ -86,55 +86,55 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : consumer.endField("host_id", 2) } - consumer.startField("uptime", 3) - consumer.addLong(data.uptime) - consumer.endField("uptime", 3) - - consumer.startField("downtime", 4) - consumer.addLong(data.downtime) - consumer.endField("downtime", 4) - - val bootTime = data.bootTime - if (bootTime != null) { - consumer.startField("boot_time", 5) - consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 5) - } - - val provisionTime = data.provisionTime - if (provisionTime != null) { - consumer.startField("provision_time", 6) - consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 6) - } + consumer.startField("mem_capacity", 3) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_capacity", 3) - consumer.startField("cpu_count", 7) + consumer.startField("cpu_count", 4) consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 7) + consumer.endField("cpu_count", 4) - consumer.startField("cpu_limit", 8) + consumer.startField("cpu_limit", 5) consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 8) + consumer.endField("cpu_limit", 5) - consumer.startField("cpu_time_active", 9) + consumer.startField("cpu_time_active", 6) consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 9) + consumer.endField("cpu_time_active", 6) - consumer.startField("cpu_time_idle", 10) + consumer.startField("cpu_time_idle", 7) consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 10) + consumer.endField("cpu_time_idle", 7) - consumer.startField("cpu_time_steal", 11) + consumer.startField("cpu_time_steal", 8) consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 11) + consumer.endField("cpu_time_steal", 8) - consumer.startField("cpu_time_lost", 12) + consumer.startField("cpu_time_lost", 9) consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 12) + consumer.endField("cpu_time_lost", 9) - consumer.startField("mem_limit", 13) - consumer.addLong(data.server.memCapacity) - consumer.endField("mem_limit", 13) + consumer.startField("uptime", 10) + consumer.addLong(data.uptime) + consumer.endField("uptime", 10) + + consumer.startField("downtime", 11) + consumer.addLong(data.downtime) + consumer.endField("downtime", 11) + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 12) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 12) + } + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 13) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 13) + } consumer.endMessage() } @@ -162,18 +162,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .named("host_id"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("uptime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("downtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("provision_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("boot_time"), + .named("mem_capacity"), Types .required(PrimitiveType.PrimitiveTypeName.INT32) .named("cpu_count"), @@ -194,7 +183,19 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .named("cpu_time_lost"), Types .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_limit") + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + ) .named("server") } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index efd38a3c..de0f0b7c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -95,7 +95,7 @@ public class ComputeMetricReader( for (host in service.hosts) { val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } reader.record(now) - monitor.record(reader) + monitor.record(reader.copy()) reader.reset() } @@ -185,6 +185,34 @@ public class ComputeMetricReader( * An aggregator for host metrics before they are reported. */ private class HostTableReaderImpl(host: Host) : HostTableReader { + override fun copy(): HostTableReader { + val newHostTable = HostTableReaderImpl(_host) + newHostTable.setValues(this) + + return newHostTable + } + + override fun setValues(table: HostTableReader) { + _timestamp = table.timestamp + _guestsTerminated = table.guestsTerminated + _guestsRunning = table.guestsRunning + _guestsError = table.guestsError + _guestsInvalid = table.guestsInvalid + _cpuLimit = table.cpuLimit + _cpuDemand = table.cpuDemand + _cpuUsage = table.cpuUsage + _cpuUtilization = table.cpuUtilization + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + _powerUsage = table.powerUsage + _powerTotal = table.powerTotal + _uptime = table.uptime + _downtime = table.downtime + _bootTime = table.bootTime + } + private val _host = host override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) @@ -326,6 +354,28 @@ public class ComputeMetricReader( * An aggregator for server metrics before they are reported. */ private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + override fun copy(): ServerTableReader { + val newServerTable = ServerTableReaderImpl(service, _server) + newServerTable.setValues(this) + + return newServerTable + } + + override fun setValues(table: ServerTableReader) { + _timestamp = table.timestamp + _uptime = table.uptime + _downtime = table.downtime + _provisionTime = table.provisionTime + _bootTime = table.bootTime + _cpuLimit = table.cpuLimit + _cpuActiveTime = table.cpuActiveTime + _cpuIdleTime = table.cpuIdleTime + _cpuStealTime = table.cpuStealTime + _cpuLostTime = table.cpuLostTime + + host = table.host + } + private val _server = server /** diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt index e6953550..9af4d13e 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt @@ -28,16 +28,21 @@ import java.time.Instant * An interface that is used to read a row of a host trace entry. */ public interface HostTableReader { - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant + + public fun copy() : HostTableReader + + public fun setValues(table: HostTableReader) /** * The [HostInfo] of the host to which the row belongs to. */ public val host: HostInfo + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + /** * The number of guests that are in a terminated state. */ diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt index c4e2fb4c..c6d8f1e1 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt @@ -28,6 +28,10 @@ import java.time.Instant * An interface that is used to read a row of a server trace entry. */ public interface ServerTableReader { + + public fun copy(): ServerTableReader + + public fun setValues(table: ServerTableReader) /** * The timestamp of the current entry of the reader. */ -- cgit v1.2.3