summaryrefslogtreecommitdiff
path: root/opendc-common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-common/src/main')
-rw-r--r--opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java161
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt64
-rw-r--r--opendc-common/src/main/resources/schema.proto4
-rw-r--r--opendc-common/src/main/resources/subscriber.toml3
4 files changed, 207 insertions, 25 deletions
diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
index c0f15f63..b65155ca 100644
--- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
+++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
@@ -71,6 +71,18 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
* @return The energyusage.
*/
double getEnergyusage();
+
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ double getUptime();
+
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ double getDowntime();
}
/**
* Protobuf type {@code proto.ProtoExport}
@@ -227,6 +239,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return energyusage_;
}
+ public static final int UPTIME_FIELD_NUMBER = 6;
+ private double uptime_ = 0D;
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ @java.lang.Override
+ public double getUptime() {
+ return uptime_;
+ }
+
+ public static final int DOWNTIME_FIELD_NUMBER = 7;
+ private double downtime_ = 0D;
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ @java.lang.Override
+ public double getDowntime() {
+ return downtime_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -256,6 +290,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToRawLongBits(energyusage_) != 0) {
output.writeDouble(5, energyusage_);
}
+ if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) {
+ output.writeDouble(6, uptime_);
+ }
+ if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) {
+ output.writeDouble(7, downtime_);
+ }
getUnknownFields().writeTo(output);
}
private int computeSerializedSize_0() {
@@ -278,6 +318,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
size += com.google.protobuf.CodedOutputStream
.computeDoubleSize(5, energyusage_);
}
+ if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(6, uptime_);
+ }
+ if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(7, downtime_);
+ }
return size;
}
@java.lang.Override
@@ -314,6 +362,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToLongBits(getEnergyusage())
!= java.lang.Double.doubleToLongBits(
other.getEnergyusage())) return false;
+ if (java.lang.Double.doubleToLongBits(getUptime())
+ != java.lang.Double.doubleToLongBits(
+ other.getUptime())) return false;
+ if (java.lang.Double.doubleToLongBits(getDowntime())
+ != java.lang.Double.doubleToLongBits(
+ other.getDowntime())) return false;
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -337,6 +391,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER;
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
java.lang.Double.doubleToLongBits(getEnergyusage()));
+ hash = (37 * hash) + UPTIME_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getUptime()));
+ hash = (37 * hash) + DOWNTIME_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getDowntime()));
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -473,6 +533,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
tasksactive_ = 0;
cpuutilization_ = 0D;
energyusage_ = 0D;
+ uptime_ = 0D;
+ downtime_ = 0D;
return this;
}
@@ -521,6 +583,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (((from_bitField0_ & 0x00000010) != 0)) {
result.energyusage_ = energyusage_;
}
+ if (((from_bitField0_ & 0x00000020) != 0)) {
+ result.uptime_ = uptime_;
+ }
+ if (((from_bitField0_ & 0x00000040) != 0)) {
+ result.downtime_ = downtime_;
+ }
}
@java.lang.Override
@@ -554,6 +622,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToRawLongBits(other.getEnergyusage()) != 0) {
setEnergyusage(other.getEnergyusage());
}
+ if (java.lang.Double.doubleToRawLongBits(other.getUptime()) != 0) {
+ setUptime(other.getUptime());
+ }
+ if (java.lang.Double.doubleToRawLongBits(other.getDowntime()) != 0) {
+ setDowntime(other.getDowntime());
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -605,6 +679,16 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
bitField0_ |= 0x00000010;
break;
} // case 41
+ case 49: {
+ uptime_ = input.readDouble();
+ bitField0_ |= 0x00000020;
+ break;
+ } // case 49
+ case 57: {
+ downtime_ = input.readDouble();
+ bitField0_ |= 0x00000040;
+ break;
+ } // case 57
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag)) {
done = true; // was an endgroup tag
@@ -862,6 +946,70 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return this;
}
+ private double uptime_ ;
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ @java.lang.Override
+ public double getUptime() {
+ return uptime_;
+ }
+ /**
+ * <code>double uptime = 6;</code>
+ * @param value The uptime to set.
+ * @return This builder for chaining.
+ */
+ public Builder setUptime(double value) {
+
+ uptime_ = value;
+ bitField0_ |= 0x00000020;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>double uptime = 6;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearUptime() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ uptime_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ private double downtime_ ;
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ @java.lang.Override
+ public double getDowntime() {
+ return downtime_;
+ }
+ /**
+ * <code>double downtime = 7;</code>
+ * @param value The downtime to set.
+ * @return This builder for chaining.
+ */
+ public Builder setDowntime(double value) {
+
+ downtime_ = value;
+ bitField0_ |= 0x00000040;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>double downtime = 7;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearDowntime() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ downtime_ = 0D;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:proto.ProtoExport)
}
@@ -927,11 +1075,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" +
- "timestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013task" +
- "sactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023\n" +
- "\013energyusage\030\005 \001(\001B$\n\021org.opendc.commonB" +
- "\017ProtobufMetricsb\006proto3"
+ "\n\014schema.proto\022\005proto\"\225\001\n\013ProtoExport\022\021\n" +
+ "\ttimestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013tas" +
+ "ksactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023" +
+ "\n\013energyusage\030\005 \001(\001\022\016\n\006uptime\030\006 \001(\001\022\020\n\010d" +
+ "owntime\030\007 \001(\001B$\n\021org.opendc.commonB\017Prot" +
+ "obufMetricsb\006proto3"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@@ -942,7 +1091,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
internal_static_proto_ProtoExport_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_proto_ProtoExport_descriptor,
- new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", });
+ new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", "Uptime", "Downtime", });
descriptor.resolveAllFeaturesImmutable();
}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
index b659d40a..bf674a33 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
@@ -4,40 +4,68 @@ import com.fasterxml.jackson.dataformat.toml.TomlMapper
import redis.clients.jedis.RedisClient
import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.params.XReadParams
+import java.util.Map
import java.util.Properties
+
/**
* This class represents the Redis server instance.
* @author Mateusz Kwiatkowski
* @see <a href=https://redis.io/docs/latest/>https://redis.io/docs/latest/</a>
*
* @see <a href=https://redis.io/docs/latest/develop/data-types/streams/>https://redis.io/docs/latest/develop/data-types/streams/</a>
+ *
+ * @see <a href=https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html>https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html</a>
*/
@Suppress("DEPRECATION")
-public class Redis {
- private var properties : Properties
+public class Redis private constructor() {
+ private var jedis : RedisClient? = null
+ private var properties : Properties? = null
init {
- properties = TomlMapper().readerFor(Properties().javaClass)
- .readValue(Kafka::class.java.getResource("/producer.toml"))
+ properties = TomlMapper().readerFor(Properties().javaClass).readValue(Kafka::class.java.getResource("/subscriber.toml"))
+ jedis = RedisClient.create("redis://localhost:${properties?.getProperty("port")}")
}
- public fun run() {
- val jedis : RedisClient = RedisClient.create("redis://localhost:6379")
+ public companion object {
+ private var instance: Redis? = null
- val res5 = jedis.xread(
- XReadParams.xReadParams().block(300).count(100),
- object : HashMap<String?, StreamEntryID?>() {
- init {
- put("${properties.getProperty("table")}", StreamEntryID())
+ public fun getInstance(): Redis? {
+ if (instance == null) {
+ instance = Redis()
+ }
+ return instance
+ }
+ }
+
+ public fun readOne() {
+ while(true) {
+ val messages = jedis?.xread(
+ XReadParams.xReadParams(),
+ Map.of("${properties?.getProperty("table")}", StreamEntryID()
+ ));
+
+ if (messages != null) {
+ for (stream in messages) {
+ for (entry in stream.value) {
+ if(entry.getFields().get("downtime") != null) {
+ entry.getFields().get("downtime")?.toDouble()?.let {
+ if (it > 0) {
+ println("ID: " + entry.getID())
+ }
+ }
+ }
}
- })
+ }
+ }
+ }
+ // https://redis.io/docs/latest/develop/use-cases/streaming/java-jedis/
+ // In Redis you can subscribe to updates to a stream.
+ // You should base your application off this.
+ // You can listen for new items with XREAD
- // in Redis you can subscribe to updates to a stream.
- // you should base your application off this.
- // you can listen for new items with XREAD
- println(res5)
- jedis.close()
+ // do not close for now
+ //jedis.close()
}
-} \ No newline at end of file
+}
diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto
index d0aa18d5..ea515a39 100644
--- a/opendc-common/src/main/resources/schema.proto
+++ b/opendc-common/src/main/resources/schema.proto
@@ -11,4 +11,6 @@ message ProtoExport {
int32 tasksactive = 3;
double cpuutilization = 4;
double energyusage = 5;
-} \ No newline at end of file
+ double uptime = 6;
+ double downtime = 7;
+}
diff --git a/opendc-common/src/main/resources/subscriber.toml b/opendc-common/src/main/resources/subscriber.toml
new file mode 100644
index 00000000..75dece02
--- /dev/null
+++ b/opendc-common/src/main/resources/subscriber.toml
@@ -0,0 +1,3 @@
+"table" = "postgres_topic"
+"server" = "localhost"
+"port" = "6379"