summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-06-17 18:08:56 +0200
committermjkwiatkowski <mati.rewa@gmail.com>2026-06-17 18:08:56 +0200
commit4562f52c9b540944200b33d4ffbd60b3cbc5ee79 (patch)
tree82d028faf5a0555cb80ce9602890a3257ef695c2
parent78a9d920cc8aca951aff798272b0d5e7a2e356b9 (diff)
feat: managed to retrieve metric-by-metric from Redis in real timeHEADmaster
-rw-r--r--opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java161
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt64
-rw-r--r--opendc-common/src/main/resources/schema.proto4
-rw-r--r--opendc-common/src/main/resources/subscriber.toml3
-rw-r--r--opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt5
-rw-r--r--output/greenifier-demo-scaling/trackr.json4
-rw-r--r--resources/experiments/experiment_failures.json26
-rwxr-xr-xresources/failures/Skype_user_reported.parquetbin0 -> 3557 bytes
-rw-r--r--shell_scripts/useful_commands.sh19
13 files changed, 266 insertions, 31 deletions
diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
index c0f15f63..b65155ca 100644
--- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
+++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
@@ -71,6 +71,18 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
* @return The energyusage.
*/
double getEnergyusage();
+
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ double getUptime();
+
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ double getDowntime();
}
/**
* Protobuf type {@code proto.ProtoExport}
@@ -227,6 +239,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return energyusage_;
}
+ public static final int UPTIME_FIELD_NUMBER = 6;
+ private double uptime_ = 0D;
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ @java.lang.Override
+ public double getUptime() {
+ return uptime_;
+ }
+
+ public static final int DOWNTIME_FIELD_NUMBER = 7;
+ private double downtime_ = 0D;
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ @java.lang.Override
+ public double getDowntime() {
+ return downtime_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -256,6 +290,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToRawLongBits(energyusage_) != 0) {
output.writeDouble(5, energyusage_);
}
+ if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) {
+ output.writeDouble(6, uptime_);
+ }
+ if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) {
+ output.writeDouble(7, downtime_);
+ }
getUnknownFields().writeTo(output);
}
private int computeSerializedSize_0() {
@@ -278,6 +318,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
size += com.google.protobuf.CodedOutputStream
.computeDoubleSize(5, energyusage_);
}
+ if (java.lang.Double.doubleToRawLongBits(uptime_) != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(6, uptime_);
+ }
+ if (java.lang.Double.doubleToRawLongBits(downtime_) != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(7, downtime_);
+ }
return size;
}
@java.lang.Override
@@ -314,6 +362,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToLongBits(getEnergyusage())
!= java.lang.Double.doubleToLongBits(
other.getEnergyusage())) return false;
+ if (java.lang.Double.doubleToLongBits(getUptime())
+ != java.lang.Double.doubleToLongBits(
+ other.getUptime())) return false;
+ if (java.lang.Double.doubleToLongBits(getDowntime())
+ != java.lang.Double.doubleToLongBits(
+ other.getDowntime())) return false;
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -337,6 +391,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER;
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
java.lang.Double.doubleToLongBits(getEnergyusage()));
+ hash = (37 * hash) + UPTIME_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getUptime()));
+ hash = (37 * hash) + DOWNTIME_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getDowntime()));
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -473,6 +533,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
tasksactive_ = 0;
cpuutilization_ = 0D;
energyusage_ = 0D;
+ uptime_ = 0D;
+ downtime_ = 0D;
return this;
}
@@ -521,6 +583,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (((from_bitField0_ & 0x00000010) != 0)) {
result.energyusage_ = energyusage_;
}
+ if (((from_bitField0_ & 0x00000020) != 0)) {
+ result.uptime_ = uptime_;
+ }
+ if (((from_bitField0_ & 0x00000040) != 0)) {
+ result.downtime_ = downtime_;
+ }
}
@java.lang.Override
@@ -554,6 +622,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (java.lang.Double.doubleToRawLongBits(other.getEnergyusage()) != 0) {
setEnergyusage(other.getEnergyusage());
}
+ if (java.lang.Double.doubleToRawLongBits(other.getUptime()) != 0) {
+ setUptime(other.getUptime());
+ }
+ if (java.lang.Double.doubleToRawLongBits(other.getDowntime()) != 0) {
+ setDowntime(other.getDowntime());
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -605,6 +679,16 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
bitField0_ |= 0x00000010;
break;
} // case 41
+ case 49: {
+ uptime_ = input.readDouble();
+ bitField0_ |= 0x00000020;
+ break;
+ } // case 49
+ case 57: {
+ downtime_ = input.readDouble();
+ bitField0_ |= 0x00000040;
+ break;
+ } // case 57
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag)) {
done = true; // was an endgroup tag
@@ -862,6 +946,70 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return this;
}
+ private double uptime_ ;
+ /**
+ * <code>double uptime = 6;</code>
+ * @return The uptime.
+ */
+ @java.lang.Override
+ public double getUptime() {
+ return uptime_;
+ }
+ /**
+ * <code>double uptime = 6;</code>
+ * @param value The uptime to set.
+ * @return This builder for chaining.
+ */
+ public Builder setUptime(double value) {
+
+ uptime_ = value;
+ bitField0_ |= 0x00000020;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>double uptime = 6;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearUptime() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ uptime_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ private double downtime_ ;
+ /**
+ * <code>double downtime = 7;</code>
+ * @return The downtime.
+ */
+ @java.lang.Override
+ public double getDowntime() {
+ return downtime_;
+ }
+ /**
+ * <code>double downtime = 7;</code>
+ * @param value The downtime to set.
+ * @return This builder for chaining.
+ */
+ public Builder setDowntime(double value) {
+
+ downtime_ = value;
+ bitField0_ |= 0x00000040;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>double downtime = 7;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearDowntime() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ downtime_ = 0D;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:proto.ProtoExport)
}
@@ -927,11 +1075,12 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" +
- "timestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013task" +
- "sactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023\n" +
- "\013energyusage\030\005 \001(\001B$\n\021org.opendc.commonB" +
- "\017ProtobufMetricsb\006proto3"
+ "\n\014schema.proto\022\005proto\"\225\001\n\013ProtoExport\022\021\n" +
+ "\ttimestamp\030\001 \001(\t\022\017\n\007host_id\030\002 \001(\t\022\023\n\013tas" +
+ "ksactive\030\003 \001(\005\022\026\n\016cpuutilization\030\004 \001(\001\022\023" +
+ "\n\013energyusage\030\005 \001(\001\022\016\n\006uptime\030\006 \001(\001\022\020\n\010d" +
+ "owntime\030\007 \001(\001B$\n\021org.opendc.commonB\017Prot" +
+ "obufMetricsb\006proto3"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@@ -942,7 +1091,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
internal_static_proto_ProtoExport_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_proto_ProtoExport_descriptor,
- new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", });
+ new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", "Uptime", "Downtime", });
descriptor.resolveAllFeaturesImmutable();
}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
index b659d40a..bf674a33 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Redis.kt
@@ -4,40 +4,68 @@ import com.fasterxml.jackson.dataformat.toml.TomlMapper
import redis.clients.jedis.RedisClient
import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.params.XReadParams
+import java.util.Map
import java.util.Properties
+
/**
* This class represents the Redis server instance.
* @author Mateusz Kwiatkowski
* @see <a href=https://redis.io/docs/latest/>https://redis.io/docs/latest/</a>
*
* @see <a href=https://redis.io/docs/latest/develop/data-types/streams/>https://redis.io/docs/latest/develop/data-types/streams/</a>
+ *
+ * @see <a href=https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html>https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html</a>
*/
@Suppress("DEPRECATION")
-public class Redis {
- private var properties : Properties
+public class Redis private constructor() {
+ private var jedis : RedisClient? = null
+ private var properties : Properties? = null
init {
- properties = TomlMapper().readerFor(Properties().javaClass)
- .readValue(Kafka::class.java.getResource("/producer.toml"))
+ properties = TomlMapper().readerFor(Properties().javaClass).readValue(Kafka::class.java.getResource("/subscriber.toml"))
+ jedis = RedisClient.create("redis://localhost:${properties?.getProperty("port")}")
}
- public fun run() {
- val jedis : RedisClient = RedisClient.create("redis://localhost:6379")
+ public companion object {
+ private var instance: Redis? = null
- val res5 = jedis.xread(
- XReadParams.xReadParams().block(300).count(100),
- object : HashMap<String?, StreamEntryID?>() {
- init {
- put("${properties.getProperty("table")}", StreamEntryID())
+ public fun getInstance(): Redis? {
+ if (instance == null) {
+ instance = Redis()
+ }
+ return instance
+ }
+ }
+
+ public fun readOne() {
+ while(true) {
+ val messages = jedis?.xread(
+ XReadParams.xReadParams(),
+ Map.of("${properties?.getProperty("table")}", StreamEntryID()
+ ));
+
+ if (messages != null) {
+ for (stream in messages) {
+ for (entry in stream.value) {
+ if(entry.getFields().get("downtime") != null) {
+ entry.getFields().get("downtime")?.toDouble()?.let {
+ if (it > 0) {
+ println("ID: " + entry.getID())
+ }
+ }
+ }
}
- })
+ }
+ }
+ }
+ // https://redis.io/docs/latest/develop/use-cases/streaming/java-jedis/
+ // In Redis you can subscribe to updates to a stream.
+ // You should base your application off this.
+ // You can listen for new items with XREAD
- // in Redis you can subscribe to updates to a stream.
- // you should base your application off this.
- // you can listen for new items with XREAD
- println(res5)
- jedis.close()
+ // do not close for now
+ //jedis.close()
}
-} \ No newline at end of file
+}
diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto
index d0aa18d5..ea515a39 100644
--- a/opendc-common/src/main/resources/schema.proto
+++ b/opendc-common/src/main/resources/schema.proto
@@ -11,4 +11,6 @@ message ProtoExport {
int32 tasksactive = 3;
double cpuutilization = 4;
double energyusage = 5;
-} \ No newline at end of file
+ double uptime = 6;
+ double downtime = 7;
+}
diff --git a/opendc-common/src/main/resources/subscriber.toml b/opendc-common/src/main/resources/subscriber.toml
new file mode 100644
index 00000000..75dece02
--- /dev/null
+++ b/opendc-common/src/main/resources/subscriber.toml
@@ -0,0 +1,3 @@
+"table" = "postgres_topic"
+"server" = "localhost"
+"port" = "6379"
diff --git a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt
index 3bd253da..6cb02bb8 100644
--- a/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt
+++ b/opendc-compute/opendc-compute-failure/src/main/kotlin/org/opendc/compute/failure/models/TraceBasedFailureModel.kt
@@ -98,6 +98,10 @@ public class TraceBasedFailureModel(
pathToFile: String,
startPoint: Double,
): List<Failure> {
+ /*
+ I assume this returns a file descriptor.
+ @Mateusz Kwiatkowski
+ */
val trace = Trace.open(File(pathToFile), "failure")
val reader = checkNotNull(trace.getTable(TABLE_FAILURES)).newReader()
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
index baadd806..0edd97cf 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
@@ -60,9 +60,6 @@ public class SmartScheduler : ComputeScheduler {
return SchedulingResult(SchedulingResultType.EMPTY)
}
- // Benefits of a digital twin: during operations you make sure what is happening in the real world.
- // The use-case is making split-second automated decisions before operators can make them.
- // Make a strong case for making a Digital Twin.
override fun removeTask(
task: ServiceTask,
host: HostView?,
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
index c8368af2..ddd4a28a 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
@@ -47,6 +47,8 @@ public class KafkaComputeMonitor : ComputeMonitor {
.setTasksactive(reader.tasksActive)
.setCpuutilization(reader.cpuUtilization)
.setEnergyusage(reader.energyUsage)
+ .setUptime(reader.uptime.toDouble())
+ .setDowntime(reader.downtime.toDouble())
.build()
this.send(packet)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index 1fe597ea..55aed21b 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -49,7 +49,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
override fun run() {
try {
val experiment = getExperiment(experimentPath)
- HTTPClient.getInstance()?.sendExperiment(experimentPath)
+ //HTTPClient.getInstance()?.sendExperiment(experimentPath)
runExperiment(experiment)
} catch (e: IOException) {
println("${e.message}")
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt
index de759c0f..c221543d 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentListener.kt
@@ -14,6 +14,7 @@ import org.opendc.common.utils.Redis
*/
public fun runListener() {
PostgresqlDB()
- Redis().run()
+ val redisClient = Redis.getInstance()
+ redisClient?.readOne()
JavalinRunner()
-} \ No newline at end of file
+}
diff --git a/output/greenifier-demo-scaling/trackr.json b/output/greenifier-demo-scaling/trackr.json
index a595beff..f2eb187e 100644
--- a/output/greenifier-demo-scaling/trackr.json
+++ b/output/greenifier-demo-scaling/trackr.json
@@ -16,5 +16,9 @@
"battery": true,
"service": true
}
+ },
+ "failureModel": {
+ "type": "trace-based",
+ "pathToFile": "resources/failures/Skype_user_reported.parquet"
}
}] \ No newline at end of file
diff --git a/resources/experiments/experiment_failures.json b/resources/experiments/experiment_failures.json
new file mode 100644
index 00000000..4fd5cdf6
--- /dev/null
+++ b/resources/experiments/experiment_failures.json
@@ -0,0 +1,26 @@
+{
+ "name": "greenifier-demo-scaling",
+ "topologies": [
+ {
+ "pathToFile": "resources/topologies/surf.json"
+ }
+ ],
+ "workloads": [
+ {
+ "pathToFile": "resources/workloads/surf_month",
+ "type": "ComputeWorkload"
+ }
+ ],
+
+ "failureModels" : [
+ {
+ "pathToFile": "resources/failures/Skype_user_reported.parquet",
+ "type": "trace-based"
+ }
+ ],
+ "exportModels": [
+ {
+ "exportInterval": 3600
+ }
+ ]
+}
diff --git a/resources/failures/Skype_user_reported.parquet b/resources/failures/Skype_user_reported.parquet
new file mode 100755
index 00000000..316706c7
--- /dev/null
+++ b/resources/failures/Skype_user_reported.parquet
Binary files differ
diff --git a/shell_scripts/useful_commands.sh b/shell_scripts/useful_commands.sh
new file mode 100644
index 00000000..8ff604e2
--- /dev/null
+++ b/shell_scripts/useful_commands.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+
+doas su postgres
+# In this project we only use the ``opendc'' database.
+psql -d opendc
+
+# Queries
+# Selects everything from the topic which Kafka writes to.
+SELECT * FROM postgres_topic;
+
+# Redis CLI
+# Connect to Redis
+redis-cli -h localhost -p 6379
+# Very useful tutorial https://redis.io/docs/latest/develop/data-types/streams/
+# https://redis.io/docs/latest/develop/tools/cli/
+# Returns all items in the Redis stream
+XRANGE postgres_topic - +
+XTRIM postgres_topic MAXLEN 0
+# https://codesignal.com/learn/courses/mastering-redis-for-high-performance-applications-with-java-and-jedis-1/lessons/redis-streams-with-java