From 4562f52c9b540944200b33d4ffbd60b3cbc5ee79 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Wed, 17 Jun 2026 18:08:56 +0200 Subject: feat: managed to retrieve metric-by-metric from Redis in real time --- .../java/org/opendc/common/ProtobufMetrics.java | 161 ++++++++++++++++++++- .../main/kotlin/org/opendc/common/utils/Redis.kt | 64 +++++--- opendc-common/src/main/resources/schema.proto | 4 +- opendc-common/src/main/resources/subscriber.toml | 3 + .../failure/models/TraceBasedFailureModel.kt | 4 + .../compute/simulator/scheduler/SmartScheduler.kt | 3 - .../simulator/telemetry/KafkaComputeMonitor.kt | 2 + .../experiments/base/runner/ExperimentCli.kt | 2 +- .../experiments/base/runner/ExperimentListener.kt | 5 +- output/greenifier-demo-scaling/trackr.json | 4 + resources/experiments/experiment_failures.json | 26 ++++ resources/failures/Skype_user_reported.parquet | Bin 0 -> 3557 bytes shell_scripts/useful_commands.sh | 19 +++ 13 files changed, 266 insertions(+), 31 deletions(-) create mode 100644 opendc-common/src/main/resources/subscriber.toml create mode 100644 resources/experiments/experiment_failures.json create mode 100755 resources/failures/Skype_user_reported.parquet create mode 100644 shell_scripts/useful_commands.sh diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java index c0f15f63..b65155ca 100644 --- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java +++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java @@ -71,6 +71,18 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { * @return The energyusage. */ double getEnergyusage(); + + /** + * double uptime = 6; + * @return The uptime. + */ + double getUptime(); + + /** + * double downtime = 7; + * @return The downtime. + */ + double getDowntime(); } /** * Protobuf type {@code proto.ProtoExport} @@ -227,6 +239,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return energyusage_; } + public static final int UPTIME_FIELD_NUMBER = 6; + private double uptime_ = 0D; + /** + * double uptime = 6; + * @return The uptime. + */ + @java.lang.Override + public double getUptime() { + return uptime_; + } + + public static final int DOWNTIME_FIELD_NUMBER = 7; + private double downtime_ = 0D; + /** + * double downtime = 7; + * @return The downtime. + */ + @java.lang.Override + public double getDowntime() { + return downtime_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -256,6 +290,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToRawLongBits(energyusage_) != 0) { output.writeDouble(5, energyusage_); } + if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) { + output.writeDouble(6, uptime_); + } + if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) { + output.writeDouble(7, downtime_); + } getUnknownFields().writeTo(output); } private int computeSerializedSize_0() { @@ -278,6 +318,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { size += com.google.protobuf.CodedOutputStream .computeDoubleSize(5, energyusage_); } + if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(6, uptime_); + } + if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(7, downtime_); + } return size; } @java.lang.Override @@ -314,6 +362,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToLongBits(getEnergyusage()) != java.lang.Double.doubleToLongBits( other.getEnergyusage())) return false; + if (java.lang.Double.doubleToLongBits(getUptime()) + != java.lang.Double.doubleToLongBits( + other.getUptime())) return false; + if (java.lang.Double.doubleToLongBits(getDowntime()) + != java.lang.Double.doubleToLongBits( + other.getDowntime())) return false; if (!getUnknownFields().equals(other.getUnknownFields())) return false; return true; } @@ -337,6 +391,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER; hash = (53 * hash) + com.google.protobuf.Internal.hashLong( java.lang.Double.doubleToLongBits(getEnergyusage())); + hash = (37 * hash) + UPTIME_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getUptime())); + hash = (37 * hash) + DOWNTIME_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getDowntime())); hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -473,6 +533,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { tasksactive_ = 0; cpuutilization_ = 0D; energyusage_ = 0D; + uptime_ = 0D; + downtime_ = 0D; return this; } @@ -521,6 +583,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (((from_bitField0_ & 0x00000010) != 0)) { result.energyusage_ = energyusage_; } + if (((from_bitField0_ & 0x00000020) != 0)) { + result.uptime_ = uptime_; + } + if (((from_bitField0_ & 0x00000040) != 0)) { + result.downtime_ = downtime_; + } } @java.lang.Override @@ -554,6 +622,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToRawLongBits(other.getEnergyusage()) != 0) { setEnergyusage(other.getEnergyusage()); } + if (java.lang.Double.doubleToRawLongBits(other.getUptime()) != 0) { + setUptime(other.getUptime()); + } + if (java.lang.Double.doubleToRawLongBits(other.getDowntime()) != 0) { + setDowntime(other.getDowntime()); + } this.mergeUnknownFields(other.getUnknownFields()); onChanged(); return this; @@ -605,6 +679,16 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { bitField0_ |= 0x00000010; break; } // case 41 + case 49: { + uptime_ = input.readDouble(); + bitField0_ |= 0x00000020; + break; + } // case 49 + case 57: { + downtime_ = input.readDouble(); + bitField0_ |= 0x00000040; + break; + } // case 57 default: { if (!super.parseUnknownField(input, extensionRegistry, tag)) { done = true; // was an endgroup tag @@ -862,6 +946,70 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return this; } + private double uptime_ ; + /** + * double uptime = 6; + * @return The uptime. + */ + @java.lang.Override + public double getUptime() { + return uptime_; + } + /** + * double uptime = 6; + * @param value The uptime to set. + * @return This builder for chaining. + */ + public Builder setUptime(double value) { + + uptime_ = value; + bitField0_ |= 0x00000020; + onChanged(); + return this; + } + /** + * double uptime = 6; + * @return This builder for chaining. + */ + public Builder clearUptime() { + bitField0_ = (bitField0_ & ~0x00000020); + uptime_ = 0D; + onChanged(); + return this; + } + + private double downtime_ ; + /** + * double downtime = 7; + * @return The downtime. + */ + @java.lang.Override + public double getDowntime() { + return downtime_; + } + /** + * double downtime = 7; + * @param value The downtime to set. + * @return This builder for chaining. + */ + public Builder setDowntime(double value) { + + downtime_ = value; + bitField0_ |= 0x00000040; + onChanged(); + return this; + } + /** + * double downtime = 7; + * @return This builder for chaining. + */ + public Builder clearDowntime() { + bitField0_ = (bitField0_ & ~0x00000040); + downtime_ = 0D; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:proto.ProtoExport) } @@ -927,11 +1075,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { descriptor; static { java.lang.String[] descriptorData = { - "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" + - "timestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013task" + - "sactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023\n" + - "\013energyusage\030\005 \001(\001B$\n\021org.opendc.commonB" + - "\017ProtobufMetricsb\006proto3" + "\n\014schema.proto\022\005proto\"\225\001\n\013ProtoExport\022\021\n" + + "\ttimestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013tas" + + "ksactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023" + + "\n\013energyusage\030\005 \001(\001\022\016\n\006uptime\030\006 \001(\001\022\020\n\010d" + + "owntime\030\007 \001(\001B$\n\021org.opendc.commonB\017Prot" + + "obufMetricsb\006proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -942,7 +1091,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { internal_static_proto_ProtoExport_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_proto_ProtoExport_descriptor, - new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", }); + new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", "Uptime", "Downtime", }); descriptor.resolveAllFeaturesImmutable(); } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt index b659d40a..bf674a33 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt @@ -4,40 +4,68 @@ import com.fasterxml.jackson.dataformat.toml.TomlMapper import redis.clients.jedis.RedisClient import redis.clients.jedis.StreamEntryID import redis.clients.jedis.params.XReadParams +import java.util.Map import java.util.Properties + /** * This class represents the Redis server instance. * @author Mateusz Kwiatkowski * @see https://redis.io/docs/latest/ * * @see https://redis.io/docs/latest/develop/data-types/streams/ + * + * @see https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html */ @Suppress("DEPRECATION") -public class Redis { - private var properties : Properties +public class Redis private constructor() { + private var jedis : RedisClient? = null + private var properties : Properties? = null init { - properties = TomlMapper().readerFor(Properties().javaClass) - .readValue(Kafka::class.java.getResource("/producer.toml")) + properties = TomlMapper().readerFor(Properties().javaClass).readValue(Kafka::class.java.getResource("/subscriber.toml")) + jedis = RedisClient.create("redis://localhost:${properties?.getProperty("port")}") } - public fun run() { - val jedis : RedisClient = RedisClient.create("redis://localhost:6379") + public companion object { + private var instance: Redis? = null - val res5 = jedis.xread( - XReadParams.xReadParams().block(300).count(100), - object : HashMap() { - init { - put("${properties.getProperty("table")}", StreamEntryID()) + public fun getInstance(): Redis? { + if (instance == null) { + instance = Redis() + } + return instance + } + } + + public fun readOne() { + while(true) { + val messages = jedis?.xread( + XReadParams.xReadParams(), + Map.of("${properties?.getProperty("table")}", StreamEntryID() + )); + + if (messages != null) { + for (stream in messages) { + for (entry in stream.value) { + if(entry.getFields().get("downtime") != null) { + entry.getFields().get("downtime")?.toDouble()?.let { + if (it > 0) { + println("ID: " + entry.getID()) + } + } + } } - }) + } + } + } + // https://redis.io/docs/latest/develop/use-cases/streaming/java-jedis/ + // In Redis you can subscribe to updates to a stream. + // You should base your application off this. + // You can listen for new items with XREAD - // in Redis you can subscribe to updates to a stream. - // you should base your application off this. - // you can listen for new items with XREAD - println(res5) - jedis.close() + // do not close for now + //jedis.close() } -} \ No newline at end of file +} diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto index d0aa18d5..ea515a39 100644 --- a/opendc-common/src/main/resources/schema.proto +++ b/opendc-common/src/main/resources/schema.proto @@ -11,4 +11,6 @@ message ProtoExport { int32 tasksactive = 3; double cpuutilization = 4; double energyusage = 5; -} \ No newline at end of file + double uptime = 6; + double downtime = 7; +} diff --git a/opendc-common/src/main/resources/subscriber.toml b/opendc-common/src/main/resources/subscriber.toml new file mode 100644 index 00000000..75dece02 --- /dev/null +++ b/opendc-common/src/main/resources/subscriber.toml @@ -0,0 +1,3 @@ +"table" = "postgres_topic" +"server" = "localhost" +"port" = "6379" diff --git a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt index 3bd253da..6cb02bb8 100644 --- a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt +++ b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt @@ -98,6 +98,10 @@ public class TraceBasedFailureModel( pathToFile: String, startPoint: Double, ): List { + /* + I assume this returns a file descriptor. + @Mateusz Kwiatkowski + */ val trace = Trace.open(File(pathToFile), "failure") val reader = checkNotNull(trace.getTable(TABLE_FAILURES)).newReader() diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt index baadd806..0edd97cf 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt @@ -60,9 +60,6 @@ public class SmartScheduler : ComputeScheduler { return SchedulingResult(SchedulingResultType.EMPTY) } - // Benefits of a digital twin: during operations you make sure what is happening in the real world. - // The use-case is making split-second automated decisions before operators can make them. - // Make a strong case for making a Digital Twin. override fun removeTask( task: ServiceTask, host: HostView?, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt index c8368af2..ddd4a28a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt @@ -47,6 +47,8 @@ public class KafkaComputeMonitor : ComputeMonitor { .setTasksactive(reader.tasksActive) .setCpuutilization(reader.cpuUtilization) .setEnergyusage(reader.energyUsage) + .setUptime(reader.uptime.toDouble()) + .setDowntime(reader.downtime.toDouble()) .build() this.send(packet) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index 1fe597ea..55aed21b 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -49,7 +49,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { override fun run() { try { val experiment = getExperiment(experimentPath) - HTTPClient.getInstance()?.sendExperiment(experimentPath) + //HTTPClient.getInstance()?.sendExperiment(experimentPath) runExperiment(experiment) } catch (e: IOException) { println("${e.message}") diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt index de759c0f..c221543d 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt @@ -14,6 +14,7 @@ import org.opendc.common.utils.Redis */ public fun runListener() { PostgresqlDB() - Redis().run() + val redisClient = Redis.getInstance() + redisClient?.readOne() JavalinRunner() -} \ No newline at end of file +} diff --git a/output/greenifier-demo-scaling/trackr.json b/output/greenifier-demo-scaling/trackr.json index a595beff..f2eb187e 100644 --- a/output/greenifier-demo-scaling/trackr.json +++ b/output/greenifier-demo-scaling/trackr.json @@ -16,5 +16,9 @@ "battery": true, "service": true } + }, + "failureModel": { + "type": "trace-based", + "pathToFile": "resources/failures/Skype_user_reported.parquet" } }] \ No newline at end of file diff --git a/resources/experiments/experiment_failures.json b/resources/experiments/experiment_failures.json new file mode 100644 index 00000000..4fd5cdf6 --- /dev/null +++ b/resources/experiments/experiment_failures.json @@ -0,0 +1,26 @@ +{ + "name": "greenifier-demo-scaling", + "topologies": [ + { + "pathToFile": "resources/topologies/surf.json" + } + ], + "workloads": [ + { + "pathToFile": "resources/workloads/surf_month", + "type": "ComputeWorkload" + } + ], + + "failureModels" : [ + { + "pathToFile": "resources/failures/Skype_user_reported.parquet", + "type": "trace-based" + } + ], + "exportModels": [ + { + "exportInterval": 3600 + } + ] +} diff --git a/resources/failures/Skype_user_reported.parquet b/resources/failures/Skype_user_reported.parquet new file mode 100755 index 00000000..316706c7 Binary files /dev/null and b/resources/failures/Skype_user_reported.parquet differ diff --git a/shell_scripts/useful_commands.sh b/shell_scripts/useful_commands.sh new file mode 100644 index 00000000..8ff604e2 --- /dev/null +++ b/shell_scripts/useful_commands.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +doas su postgres +# In this project we only use the ``opendc'' database. +psql -d opendc + +# Queries +# Selects everything from the topic which Kafka writes to. +SELECT * FROM postgres_topic; + +# Redis CLI +# Connect to Redis +redis-cli -h localhost -p 6379 +# Very useful tutorial https://redis.io/docs/latest/develop/data-types/streams/ +# https://redis.io/docs/latest/develop/tools/cli/ +# Returns all items in the Redis stream +XRANGE postgres_topic - + +XTRIM postgres_topic MAXLEN 0 +# https://codesignal.com/learn/courses/mastering-redis-for-high-performance-applications-with-java-and-jedis-1/lessons/redis-streams-with-java -- cgit v1.2.3