summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-compute
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2023-11-10 22:30:49 +0100
committerDante Niewenhuis <d.niewenhuis@hotmail.com>2023-11-10 22:32:29 +0100
commit059a09949e839fe9e906c398710b5235b452b0e0 (patch)
tree6d8fdca4db4697b3d5dbe6ae291425d07fa84276 /opendc-experiments/opendc-experiments-compute
parentaa9149b9c49be570c2b14254d3c6a23d7c077e34 (diff)
added greenifier demo, fixed HostTableReader
Diffstat (limited to 'opendc-experiments/opendc-experiments-compute')
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt2
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt147
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt99
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt52
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt13
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt4
6 files changed, 199 insertions, 118 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
index 16d28edb..5d4c88cd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/TraceHelpers.kt
@@ -104,7 +104,7 @@ public suspend fun ComputeService.replay(
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
- delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
+ delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000))
// Stop the server after reaching the end-time of the virtual machine
server.stop()
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt
index 979aebeb..75e792d0 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetHostDataWriter.kt
@@ -78,68 +78,80 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
consumer.addBinary(UUID.fromString(data.host.id).toBinary())
consumer.endField("host_id", 1)
- consumer.startField("uptime", 2)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 2)
+ consumer.startField("cpu_count", 2)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 2)
- consumer.startField("downtime", 3)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 3)
+ consumer.startField("mem_capacity", 3)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_capacity", 3)
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 4)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 4)
- }
+ consumer.startField("guests_terminated", 4)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 4)
- consumer.startField("cpu_count", 5)
- consumer.addInteger(data.host.cpuCount)
- consumer.endField("cpu_count", 5)
+ consumer.startField("guests_running", 5)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 5)
- consumer.startField("cpu_limit", 6)
+ consumer.startField("guests_error", 6)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 6)
+
+ consumer.startField("guests_invalid", 7)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 7)
+
+ consumer.startField("cpu_limit", 8)
consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 6)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_usage", 9)
+ consumer.addDouble(data.cpuUsage)
+ consumer.endField("cpu_usage", 9)
+
+ consumer.startField("cpu_demand", 10)
+ consumer.addDouble(data.cpuUsage)
+ consumer.endField("cpu_demand", 10)
+
+ consumer.startField("cpu_utilization", 11)
+ consumer.addDouble(data.cpuUtilization)
+ consumer.endField("cpu_utilization", 11)
- consumer.startField("cpu_time_active", 7)
+ consumer.startField("cpu_time_active", 12)
consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 7)
+ consumer.endField("cpu_time_active", 12)
- consumer.startField("cpu_time_idle", 8)
+ consumer.startField("cpu_time_idle", 13)
consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 8)
+ consumer.endField("cpu_time_idle", 13)
- consumer.startField("cpu_time_steal", 9)
+ consumer.startField("cpu_time_steal", 14)
consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 9)
+ consumer.endField("cpu_time_steal", 14)
- consumer.startField("cpu_time_lost", 10)
+ consumer.startField("cpu_time_lost", 15)
consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 10)
+ consumer.endField("cpu_time_lost", 15)
- consumer.startField("mem_limit", 11)
- consumer.addLong(data.host.memCapacity)
- consumer.endField("mem_limit", 11)
-
- consumer.startField("power_total", 12)
+ consumer.startField("power_total", 16)
consumer.addDouble(data.powerTotal)
- consumer.endField("power_total", 12)
+ consumer.endField("power_total", 16)
- consumer.startField("guests_terminated", 13)
- consumer.addInteger(data.guestsTerminated)
- consumer.endField("guests_terminated", 13)
-
- consumer.startField("guests_running", 14)
- consumer.addInteger(data.guestsRunning)
- consumer.endField("guests_running", 14)
+ consumer.startField("uptime", 17)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 17)
- consumer.startField("guests_error", 15)
- consumer.addInteger(data.guestsError)
- consumer.endField("guests_error", 15)
+ consumer.startField("downtime", 18)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 18)
- consumer.startField("guests_invalid", 16)
- consumer.addInteger(data.guestsInvalid)
- consumer.endField("guests_invalid", 16)
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 19)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 19)
+ }
consumer.endMessage()
}
@@ -162,22 +174,36 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.`as`(LogicalTypeAnnotation.uuidType())
.named("host_id"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
+ .named("mem_capacity"),
Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time"),
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
Types
.required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
Types
.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
.named("cpu_limit"),
Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_demand"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_utilization"),
+ Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("cpu_time_active"),
Types
@@ -190,23 +216,18 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.required(PrimitiveType.PrimitiveTypeName.INT64)
.named("cpu_time_lost"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_limit"),
- Types
.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
.named("power_total"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_terminated"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_running"),
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_error"),
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_invalid")
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time")
)
.named("host")
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt
index 99ef83c6..ed5bb64f 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/export/parquet/ParquetServerDataWriter.kt
@@ -86,55 +86,55 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
consumer.endField("host_id", 2)
}
- consumer.startField("uptime", 3)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 3)
-
- consumer.startField("downtime", 4)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 4)
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 5)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 5)
- }
-
- val provisionTime = data.provisionTime
- if (provisionTime != null) {
- consumer.startField("provision_time", 6)
- consumer.addLong(provisionTime.toEpochMilli())
- consumer.endField("provision_time", 6)
- }
+ consumer.startField("mem_capacity", 3)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_capacity", 3)
- consumer.startField("cpu_count", 7)
+ consumer.startField("cpu_count", 4)
consumer.addInteger(data.server.cpuCount)
- consumer.endField("cpu_count", 7)
+ consumer.endField("cpu_count", 4)
- consumer.startField("cpu_limit", 8)
+ consumer.startField("cpu_limit", 5)
consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 8)
+ consumer.endField("cpu_limit", 5)
- consumer.startField("cpu_time_active", 9)
+ consumer.startField("cpu_time_active", 6)
consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 9)
+ consumer.endField("cpu_time_active", 6)
- consumer.startField("cpu_time_idle", 10)
+ consumer.startField("cpu_time_idle", 7)
consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 10)
+ consumer.endField("cpu_time_idle", 7)
- consumer.startField("cpu_time_steal", 11)
+ consumer.startField("cpu_time_steal", 8)
consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 11)
+ consumer.endField("cpu_time_steal", 8)
- consumer.startField("cpu_time_lost", 12)
+ consumer.startField("cpu_time_lost", 9)
consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 12)
+ consumer.endField("cpu_time_lost", 9)
- consumer.startField("mem_limit", 13)
- consumer.addLong(data.server.memCapacity)
- consumer.endField("mem_limit", 13)
+ consumer.startField("uptime", 10)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 10)
+
+ consumer.startField("downtime", 11)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 11)
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 12)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 12)
+ }
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 13)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 13)
+ }
consumer.endMessage()
}
@@ -162,18 +162,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.named("host_id"),
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("provision_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time"),
+ .named("mem_capacity"),
Types
.required(PrimitiveType.PrimitiveTypeName.INT32)
.named("cpu_count"),
@@ -194,7 +183,19 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.named("cpu_time_lost"),
Types
.required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_limit")
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+
)
.named("server")
}
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index efd38a3c..de0f0b7c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -95,7 +95,7 @@ public class ComputeMetricReader(
for (host in service.hosts) {
val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
reader.record(now)
- monitor.record(reader)
+ monitor.record(reader.copy())
reader.reset()
}
@@ -185,6 +185,34 @@ public class ComputeMetricReader(
* An aggregator for host metrics before they are reported.
*/
private class HostTableReaderImpl(host: Host) : HostTableReader {
+ override fun copy(): HostTableReader {
+ val newHostTable = HostTableReaderImpl(_host)
+ newHostTable.setValues(this)
+
+ return newHostTable
+ }
+
+ override fun setValues(table: HostTableReader) {
+ _timestamp = table.timestamp
+ _guestsTerminated = table.guestsTerminated
+ _guestsRunning = table.guestsRunning
+ _guestsError = table.guestsError
+ _guestsInvalid = table.guestsInvalid
+ _cpuLimit = table.cpuLimit
+ _cpuDemand = table.cpuDemand
+ _cpuUsage = table.cpuUsage
+ _cpuUtilization = table.cpuUtilization
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+ _powerUsage = table.powerUsage
+ _powerTotal = table.powerTotal
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _bootTime = table.bootTime
+ }
+
private val _host = host
override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
@@ -326,6 +354,28 @@ public class ComputeMetricReader(
* An aggregator for server metrics before they are reported.
*/
private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
+ override fun copy(): ServerTableReader {
+ val newServerTable = ServerTableReaderImpl(service, _server)
+ newServerTable.setValues(this)
+
+ return newServerTable
+ }
+
+ override fun setValues(table: ServerTableReader) {
+ _timestamp = table.timestamp
+ _uptime = table.uptime
+ _downtime = table.downtime
+ _provisionTime = table.provisionTime
+ _bootTime = table.bootTime
+ _cpuLimit = table.cpuLimit
+ _cpuActiveTime = table.cpuActiveTime
+ _cpuIdleTime = table.cpuIdleTime
+ _cpuStealTime = table.cpuStealTime
+ _cpuLostTime = table.cpuLostTime
+
+ host = table.host
+ }
+
private val _server = server
/**
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt
index e6953550..9af4d13e 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/HostTableReader.kt
@@ -28,10 +28,10 @@ import java.time.Instant
* An interface that is used to read a row of a host trace entry.
*/
public interface HostTableReader {
- /**
- * The timestamp of the current entry of the reader.
- */
- public val timestamp: Instant
+
+ public fun copy() : HostTableReader
+
+ public fun setValues(table: HostTableReader)
/**
* The [HostInfo] of the host to which the row belongs to.
@@ -39,6 +39,11 @@ public interface HostTableReader {
public val host: HostInfo
/**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
* The number of guests that are in a terminated state.
*/
public val guestsTerminated: Int
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt
index c4e2fb4c..c6d8f1e1 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/table/ServerTableReader.kt
@@ -28,6 +28,10 @@ import java.time.Instant
* An interface that is used to read a row of a server trace entry.
*/
public interface ServerTableReader {
+
+ public fun copy(): ServerTableReader
+
+ public fun setValues(table: ServerTableReader)
/**
* The timestamp of the current entry of the reader.
*/