From 4562f52c9b540944200b33d4ffbd60b3cbc5ee79 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Wed, 17 Jun 2026 18:08:56 +0200 Subject: feat: managed to retrieve metric-by-metric from Redis in real time --- .../java/org/opendc/common/ProtobufMetrics.java | 161 ++++++++++++++++++++- .../main/kotlin/org/opendc/common/utils/Redis.kt | 64 +++++--- opendc-common/src/main/resources/schema.proto | 4 +- opendc-common/src/main/resources/subscriber.toml | 3 + 4 files changed, 207 insertions(+), 25 deletions(-) create mode 100644 opendc-common/src/main/resources/subscriber.toml (limited to 'opendc-common') diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java index c0f15f63..b65155ca 100644 --- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java +++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java @@ -71,6 +71,18 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { * @return The energyusage. */ double getEnergyusage(); + + /** + * double uptime = 6; + * @return The uptime. + */ + double getUptime(); + + /** + * double downtime = 7; + * @return The downtime. + */ + double getDowntime(); } /** * Protobuf type {@code proto.ProtoExport} @@ -227,6 +239,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return energyusage_; } + public static final int UPTIME_FIELD_NUMBER = 6; + private double uptime_ = 0D; + /** + * double uptime = 6; + * @return The uptime. + */ + @java.lang.Override + public double getUptime() { + return uptime_; + } + + public static final int DOWNTIME_FIELD_NUMBER = 7; + private double downtime_ = 0D; + /** + * double downtime = 7; + * @return The downtime. + */ + @java.lang.Override + public double getDowntime() { + return downtime_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -256,6 +290,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToRawLongBits(energyusage_) != 0) { output.writeDouble(5, energyusage_); } + if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) { + output.writeDouble(6, uptime_); + } + if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) { + output.writeDouble(7, downtime_); + } getUnknownFields().writeTo(output); } private int computeSerializedSize_0() { @@ -278,6 +318,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { size += com.google.protobuf.CodedOutputStream .computeDoubleSize(5, energyusage_); } + if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(6, uptime_); + } + if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(7, downtime_); + } return size; } @java.lang.Override @@ -314,6 +362,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToLongBits(getEnergyusage()) != java.lang.Double.doubleToLongBits( other.getEnergyusage())) return false; + if (java.lang.Double.doubleToLongBits(getUptime()) + != java.lang.Double.doubleToLongBits( + other.getUptime())) return false; + if (java.lang.Double.doubleToLongBits(getDowntime()) + != java.lang.Double.doubleToLongBits( + other.getDowntime())) return false; if (!getUnknownFields().equals(other.getUnknownFields())) return false; return true; } @@ -337,6 +391,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER; hash = (53 * hash) + com.google.protobuf.Internal.hashLong( java.lang.Double.doubleToLongBits(getEnergyusage())); + hash = (37 * hash) + UPTIME_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getUptime())); + hash = (37 * hash) + DOWNTIME_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getDowntime())); hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -473,6 +533,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { tasksactive_ = 0; cpuutilization_ = 0D; energyusage_ = 0D; + uptime_ = 0D; + downtime_ = 0D; return this; } @@ -521,6 +583,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (((from_bitField0_ & 0x00000010) != 0)) { result.energyusage_ = energyusage_; } + if (((from_bitField0_ & 0x00000020) != 0)) { + result.uptime_ = uptime_; + } + if (((from_bitField0_ & 0x00000040) != 0)) { + result.downtime_ = downtime_; + } } @java.lang.Override @@ -554,6 +622,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (java.lang.Double.doubleToRawLongBits(other.getEnergyusage()) != 0) { setEnergyusage(other.getEnergyusage()); } + if (java.lang.Double.doubleToRawLongBits(other.getUptime()) != 0) { + setUptime(other.getUptime()); + } + if (java.lang.Double.doubleToRawLongBits(other.getDowntime()) != 0) { + setDowntime(other.getDowntime()); + } this.mergeUnknownFields(other.getUnknownFields()); onChanged(); return this; @@ -605,6 +679,16 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { bitField0_ |= 0x00000010; break; } // case 41 + case 49: { + uptime_ = input.readDouble(); + bitField0_ |= 0x00000020; + break; + } // case 49 + case 57: { + downtime_ = input.readDouble(); + bitField0_ |= 0x00000040; + break; + } // case 57 default: { if (!super.parseUnknownField(input, extensionRegistry, tag)) { done = true; // was an endgroup tag @@ -862,6 +946,70 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return this; } + private double uptime_ ; + /** + * double uptime = 6; + * @return The uptime. + */ + @java.lang.Override + public double getUptime() { + return uptime_; + } + /** + * double uptime = 6; + * @param value The uptime to set. + * @return This builder for chaining. + */ + public Builder setUptime(double value) { + + uptime_ = value; + bitField0_ |= 0x00000020; + onChanged(); + return this; + } + /** + * double uptime = 6; + * @return This builder for chaining. + */ + public Builder clearUptime() { + bitField0_ = (bitField0_ & ~0x00000020); + uptime_ = 0D; + onChanged(); + return this; + } + + private double downtime_ ; + /** + * double downtime = 7; + * @return The downtime. + */ + @java.lang.Override + public double getDowntime() { + return downtime_; + } + /** + * double downtime = 7; + * @param value The downtime to set. + * @return This builder for chaining. + */ + public Builder setDowntime(double value) { + + downtime_ = value; + bitField0_ |= 0x00000040; + onChanged(); + return this; + } + /** + * double downtime = 7; + * @return This builder for chaining. + */ + public Builder clearDowntime() { + bitField0_ = (bitField0_ & ~0x00000040); + downtime_ = 0D; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:proto.ProtoExport) } @@ -927,11 +1075,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { descriptor; static { java.lang.String[] descriptorData = { - "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" + - "timestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013task" + - "sactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023\n" + - "\013energyusage\030\005 \001(\001B$\n\021org.opendc.commonB" + - "\017ProtobufMetricsb\006proto3" + "\n\014schema.proto\022\005proto\"\225\001\n\013ProtoExport\022\021\n" + + "\ttimestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013tas" + + "ksactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023" + + "\n\013energyusage\030\005 \001(\001\022\016\n\006uptime\030\006 \001(\001\022\020\n\010d" + + "owntime\030\007 \001(\001B$\n\021org.opendc.commonB\017Prot" + + "obufMetricsb\006proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -942,7 +1091,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { internal_static_proto_ProtoExport_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_proto_ProtoExport_descriptor, - new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", }); + new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", "Uptime", "Downtime", }); descriptor.resolveAllFeaturesImmutable(); } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt index b659d40a..bf674a33 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt @@ -4,40 +4,68 @@ import com.fasterxml.jackson.dataformat.toml.TomlMapper import redis.clients.jedis.RedisClient import redis.clients.jedis.StreamEntryID import redis.clients.jedis.params.XReadParams +import java.util.Map import java.util.Properties + /** * This class represents the Redis server instance. * @author Mateusz Kwiatkowski * @see https://redis.io/docs/latest/ * * @see https://redis.io/docs/latest/develop/data-types/streams/ + * + * @see https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html */ @Suppress("DEPRECATION") -public class Redis { - private var properties : Properties +public class Redis private constructor() { + private var jedis : RedisClient? = null + private var properties : Properties? = null init { - properties = TomlMapper().readerFor(Properties().javaClass) - .readValue(Kafka::class.java.getResource("/producer.toml")) + properties = TomlMapper().readerFor(Properties().javaClass).readValue(Kafka::class.java.getResource("/subscriber.toml")) + jedis = RedisClient.create("redis://localhost:${properties?.getProperty("port")}") } - public fun run() { - val jedis : RedisClient = RedisClient.create("redis://localhost:6379") + public companion object { + private var instance: Redis? = null - val res5 = jedis.xread( - XReadParams.xReadParams().block(300).count(100), - object : HashMap() { - init { - put("${properties.getProperty("table")}", StreamEntryID()) + public fun getInstance(): Redis? { + if (instance == null) { + instance = Redis() + } + return instance + } + } + + public fun readOne() { + while(true) { + val messages = jedis?.xread( + XReadParams.xReadParams(), + Map.of("${properties?.getProperty("table")}", StreamEntryID() + )); + + if (messages != null) { + for (stream in messages) { + for (entry in stream.value) { + if(entry.getFields().get("downtime") != null) { + entry.getFields().get("downtime")?.toDouble()?.let { + if (it > 0) { + println("ID: " + entry.getID()) + } + } + } } - }) + } + } + } + // https://redis.io/docs/latest/develop/use-cases/streaming/java-jedis/ + // In Redis you can subscribe to updates to a stream. + // You should base your application off this. + // You can listen for new items with XREAD - // in Redis you can subscribe to updates to a stream. - // you should base your application off this. - // you can listen for new items with XREAD - println(res5) - jedis.close() + // do not close for now + //jedis.close() } -} \ No newline at end of file +} diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto index d0aa18d5..ea515a39 100644 --- a/opendc-common/src/main/resources/schema.proto +++ b/opendc-common/src/main/resources/schema.proto @@ -11,4 +11,6 @@ message ProtoExport { int32 tasksactive = 3; double cpuutilization = 4; double energyusage = 5; -} \ No newline at end of file + double uptime = 6; + double downtime = 7; +} diff --git a/opendc-common/src/main/resources/subscriber.toml b/opendc-common/src/main/resources/subscriber.toml new file mode 100644 index 00000000..75dece02 --- /dev/null +++ b/opendc-common/src/main/resources/subscriber.toml @@ -0,0 +1,3 @@ +"table" = "postgres_topic" +"server" = "localhost" +"port" = "6379" -- cgit v1.2.3