summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-02-20 16:17:39 +0100
committermjkwiatkowski <mati.rewa@gmail.com>2026-02-20 16:17:39 +0100
commitf5da60e4275ca1172128c3994298691e12d5e1f8 (patch)
tree189804251bf88bf390e1c9ffb4472b7a798d7f22
parent2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (diff)
fix: changed the syntex to slowly get rid of the Config classHEADmaster
-rw-r--r--README.md23
-rw-r--r--opendc-common/build.gradle.kts3
-rw-r--r--opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java630
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt13
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt67
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt63
-rw-r--r--opendc-common/src/main/resources/consumer.toml6
-rw-r--r--opendc-common/src/main/resources/database.toml5
-rw-r--r--opendc-common/src/main/resources/log4j2.xml1
-rw-r--r--opendc-common/src/main/resources/producer.toml5
-rw-r--r--opendc-common/src/main/resources/schema.proto14
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt5
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt77
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt12
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt9
-rw-r--r--output/greenifier-demo-scaling/trackr.json40
-rw-r--r--resources/experiments/config.json3
-rw-r--r--resources/experiments/config.toml11
-rw-r--r--resources/experiments/experiment_scaling.json9
-rw-r--r--resources/experiments/schema.proto11
-rw-r--r--resources/experiments/sink-jdbc.properties5
-rw-r--r--resources/topologies/surf_small_solution.json25
22 files changed, 747 insertions, 290 deletions
diff --git a/README.md b/README.md
index 5267b498..55448b73 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,8 @@
### Dependencies
Paths are hardcoded.
+Be aware the project will only work with this OpenDC as there are hidden Maven dependencies.
+Kafka topic should be named `postgres_topic`.
Confluent local (see https://www.confluent.io/installation/):
@@ -8,10 +10,9 @@ Confluent local (see https://www.confluent.io/installation/):
export CONFLUENT_HOME=/opt/confluent
export PATH=/opt/confluent/bin:$PATH
cd /opt/confluent
-kafka-storage.sh random-uuid
-kafka-storage.sh format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c etc/kafka/server.properties
-kafka-server-start.sh etc/kafka/server.properties
-kafka-server-start etc/kafka/server.properties
+kafka-storage random-uuid
+kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone
+kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties
```
@@ -20,8 +21,8 @@ Confluent JDBC sink and source (includes Postgres connector)
(see https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
Be mindful to configure the right `plugin.path` in `etc/kafka/connect-standalone.properties`
```bash
-ln -s $HOME/git/opendc/resources/experiments/sink-jdbc.properties opt/confluent/share/confluent-common/connectors/sink-jdbc.properties
-```
+ln -s /home/matt/git/opendc/resources/experiments/sink-jdbc.properties /opt/confluent/share/confluent-common/connectors
+```
Protobuf:
@@ -31,6 +32,7 @@ You need to run this each time you change `schema.proto`
```bash
cd resources/experiments/
protoc --java_out=/home/matt/git/opendc/opendc-common/src/main/java/ schema.proto
+curl -X DELETE http://localhost:8081/subjects/postgres-topic-value
```
Postgresql:
@@ -45,11 +47,10 @@ touch .s.PGSQL.5432.lock
chown -R postgres:postgres /run/postgresql
```
-
Random
```bash
-bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres-topic
-bin/kafka-topics.sh --create --topic postgres-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
+bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres_topic
+bin/kafka-topics.sh --create --topic postgres_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
-bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres-topic --from-beginning
-``` \ No newline at end of file
+bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres_topic --from-beginning
+```
diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts
index 4d9e8b54..0c87303c 100644
--- a/opendc-common/build.gradle.kts
+++ b/opendc-common/build.gradle.kts
@@ -62,6 +62,9 @@ dependencies {
// Source: https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer
implementation("io.confluent:kafka-protobuf-serializer:8.1.1")
+
+ // Source: https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-toml
+ implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-toml:2.21.0")
}
diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
index 0ec97cd0..7f7a3874 100644
--- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
+++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java
@@ -31,26 +31,71 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
com.google.protobuf.MessageOrBuilder {
/**
- * <code>required int32 id = 1;</code>
- * @return Whether the id field is set.
+ * <code>required string timestamp = 1;</code>
+ * @return Whether the timestamp field is set.
*/
- boolean hasId();
+ boolean hasTimestamp();
/**
- * <code>required int32 id = 1;</code>
- * @return The id.
+ * <code>required string timestamp = 1;</code>
+ * @return The timestamp.
*/
- int getId();
+ java.lang.String getTimestamp();
+ /**
+ * <code>required string timestamp = 1;</code>
+ * @return The bytes for timestamp.
+ */
+ com.google.protobuf.ByteString
+ getTimestampBytes();
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required string host_id = 2;</code>
+ * @return Whether the hostId field is set.
+ */
+ boolean hasHostId();
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The hostId.
+ */
+ java.lang.String getHostId();
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The bytes for hostId.
+ */
+ com.google.protobuf.ByteString
+ getHostIdBytes();
+
+ /**
+ * <code>required int32 tasksactive = 3;</code>
* @return Whether the tasksactive field is set.
*/
boolean hasTasksactive();
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return The tasksactive.
*/
int getTasksactive();
+
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return Whether the cpuutilization field is set.
+ */
+ boolean hasCpuutilization();
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return The cpuutilization.
+ */
+ double getCpuutilization();
+
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return Whether the energyusage field is set.
+ */
+ boolean hasEnergyusage();
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return The energyusage.
+ */
+ double getEnergyusage();
}
/**
* Protobuf type {@code proto.ProtoExport}
@@ -74,6 +119,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
super(builder);
}
private ProtoExport() {
+ timestamp_ = "";
+ hostId_ = "";
}
public static final com.google.protobuf.Descriptors.Descriptor
@@ -90,37 +137,116 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
}
private int bitField0_;
- public static final int ID_FIELD_NUMBER = 1;
- private int id_ = 0;
+ public static final int TIMESTAMP_FIELD_NUMBER = 1;
+ @SuppressWarnings("serial")
+ private volatile java.lang.Object timestamp_ = "";
/**
- * <code>required int32 id = 1;</code>
- * @return Whether the id field is set.
+ * <code>required string timestamp = 1;</code>
+ * @return Whether the timestamp field is set.
*/
@java.lang.Override
- public boolean hasId() {
+ public boolean hasTimestamp() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
- * <code>required int32 id = 1;</code>
- * @return The id.
+ * <code>required string timestamp = 1;</code>
+ * @return The timestamp.
+ */
+ @java.lang.Override
+ public java.lang.String getTimestamp() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ timestamp_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>required string timestamp = 1;</code>
+ * @return The bytes for timestamp.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString
+ getTimestampBytes() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ timestamp_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ public static final int HOST_ID_FIELD_NUMBER = 2;
+ @SuppressWarnings("serial")
+ private volatile java.lang.Object hostId_ = "";
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return Whether the hostId field is set.
+ */
+ @java.lang.Override
+ public boolean hasHostId() {
+ return ((bitField0_ & 0x00000002) != 0);
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The hostId.
+ */
+ @java.lang.Override
+ public java.lang.String getHostId() {
+ java.lang.Object ref = hostId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ hostId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The bytes for hostId.
*/
@java.lang.Override
- public int getId() {
- return id_;
+ public com.google.protobuf.ByteString
+ getHostIdBytes() {
+ java.lang.Object ref = hostId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ hostId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
}
- public static final int TASKSACTIVE_FIELD_NUMBER = 2;
+ public static final int TASKSACTIVE_FIELD_NUMBER = 3;
private int tasksactive_ = 0;
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return Whether the tasksactive field is set.
*/
@java.lang.Override
public boolean hasTasksactive() {
- return ((bitField0_ & 0x00000002) != 0);
+ return ((bitField0_ & 0x00000004) != 0);
}
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return The tasksactive.
*/
@java.lang.Override
@@ -128,6 +254,44 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return tasksactive_;
}
+ public static final int CPUUTILIZATION_FIELD_NUMBER = 4;
+ private double cpuutilization_ = 0D;
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return Whether the cpuutilization field is set.
+ */
+ @java.lang.Override
+ public boolean hasCpuutilization() {
+ return ((bitField0_ & 0x00000008) != 0);
+ }
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return The cpuutilization.
+ */
+ @java.lang.Override
+ public double getCpuutilization() {
+ return cpuutilization_;
+ }
+
+ public static final int ENERGYUSAGE_FIELD_NUMBER = 5;
+ private double energyusage_ = 0D;
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return Whether the energyusage field is set.
+ */
+ @java.lang.Override
+ public boolean hasEnergyusage() {
+ return ((bitField0_ & 0x00000010) != 0);
+ }
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return The energyusage.
+ */
+ @java.lang.Override
+ public double getEnergyusage() {
+ return energyusage_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -135,7 +299,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
if (isInitialized == 1) return true;
if (isInitialized == 0) return false;
- if (!hasId()) {
+ if (!hasTimestamp()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasHostId()) {
memoizedIsInitialized = 0;
return false;
}
@@ -143,6 +311,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
memoizedIsInitialized = 0;
return false;
}
+ if (!hasCpuutilization()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasEnergyusage()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -151,10 +327,19 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (((bitField0_ & 0x00000001) != 0)) {
- output.writeInt32(1, id_);
+ com.google.protobuf.GeneratedMessage.writeString(output, 1, timestamp_);
}
if (((bitField0_ & 0x00000002) != 0)) {
- output.writeInt32(2, tasksactive_);
+ com.google.protobuf.GeneratedMessage.writeString(output, 2, hostId_);
+ }
+ if (((bitField0_ & 0x00000004) != 0)) {
+ output.writeInt32(3, tasksactive_);
+ }
+ if (((bitField0_ & 0x00000008) != 0)) {
+ output.writeDouble(4, cpuutilization_);
+ }
+ if (((bitField0_ & 0x00000010) != 0)) {
+ output.writeDouble(5, energyusage_);
}
getUnknownFields().writeTo(output);
}
@@ -166,12 +351,22 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
size = 0;
if (((bitField0_ & 0x00000001) != 0)) {
- size += com.google.protobuf.CodedOutputStream
- .computeInt32Size(1, id_);
+ size += com.google.protobuf.GeneratedMessage.computeStringSize(1, timestamp_);
}
if (((bitField0_ & 0x00000002) != 0)) {
+ size += com.google.protobuf.GeneratedMessage.computeStringSize(2, hostId_);
+ }
+ if (((bitField0_ & 0x00000004) != 0)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, tasksactive_);
+ }
+ if (((bitField0_ & 0x00000008) != 0)) {
size += com.google.protobuf.CodedOutputStream
- .computeInt32Size(2, tasksactive_);
+ .computeDoubleSize(4, cpuutilization_);
+ }
+ if (((bitField0_ & 0x00000010) != 0)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeDoubleSize(5, energyusage_);
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@@ -188,16 +383,33 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
}
org.opendc.common.ProtobufMetrics.ProtoExport other = (org.opendc.common.ProtobufMetrics.ProtoExport) obj;
- if (hasId() != other.hasId()) return false;
- if (hasId()) {
- if (getId()
- != other.getId()) return false;
+ if (hasTimestamp() != other.hasTimestamp()) return false;
+ if (hasTimestamp()) {
+ if (!getTimestamp()
+ .equals(other.getTimestamp())) return false;
+ }
+ if (hasHostId() != other.hasHostId()) return false;
+ if (hasHostId()) {
+ if (!getHostId()
+ .equals(other.getHostId())) return false;
}
if (hasTasksactive() != other.hasTasksactive()) return false;
if (hasTasksactive()) {
if (getTasksactive()
!= other.getTasksactive()) return false;
}
+ if (hasCpuutilization() != other.hasCpuutilization()) return false;
+ if (hasCpuutilization()) {
+ if (java.lang.Double.doubleToLongBits(getCpuutilization())
+ != java.lang.Double.doubleToLongBits(
+ other.getCpuutilization())) return false;
+ }
+ if (hasEnergyusage() != other.hasEnergyusage()) return false;
+ if (hasEnergyusage()) {
+ if (java.lang.Double.doubleToLongBits(getEnergyusage())
+ != java.lang.Double.doubleToLongBits(
+ other.getEnergyusage())) return false;
+ }
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -209,14 +421,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
- if (hasId()) {
- hash = (37 * hash) + ID_FIELD_NUMBER;
- hash = (53 * hash) + getId();
+ if (hasTimestamp()) {
+ hash = (37 * hash) + TIMESTAMP_FIELD_NUMBER;
+ hash = (53 * hash) + getTimestamp().hashCode();
+ }
+ if (hasHostId()) {
+ hash = (37 * hash) + HOST_ID_FIELD_NUMBER;
+ hash = (53 * hash) + getHostId().hashCode();
}
if (hasTasksactive()) {
hash = (37 * hash) + TASKSACTIVE_FIELD_NUMBER;
hash = (53 * hash) + getTasksactive();
}
+ if (hasCpuutilization()) {
+ hash = (37 * hash) + CPUUTILIZATION_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getCpuutilization()));
+ }
+ if (hasEnergyusage()) {
+ hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER;
+ hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
+ java.lang.Double.doubleToLongBits(getEnergyusage()));
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -348,8 +574,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
public Builder clear() {
super.clear();
bitField0_ = 0;
- id_ = 0;
+ timestamp_ = "";
+ hostId_ = "";
tasksactive_ = 0;
+ cpuutilization_ = 0D;
+ energyusage_ = 0D;
return this;
}
@@ -385,13 +614,25 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) != 0)) {
- result.id_ = id_;
+ result.timestamp_ = timestamp_;
to_bitField0_ |= 0x00000001;
}
if (((from_bitField0_ & 0x00000002) != 0)) {
- result.tasksactive_ = tasksactive_;
+ result.hostId_ = hostId_;
to_bitField0_ |= 0x00000002;
}
+ if (((from_bitField0_ & 0x00000004) != 0)) {
+ result.tasksactive_ = tasksactive_;
+ to_bitField0_ |= 0x00000004;
+ }
+ if (((from_bitField0_ & 0x00000008) != 0)) {
+ result.cpuutilization_ = cpuutilization_;
+ to_bitField0_ |= 0x00000008;
+ }
+ if (((from_bitField0_ & 0x00000010) != 0)) {
+ result.energyusage_ = energyusage_;
+ to_bitField0_ |= 0x00000010;
+ }
result.bitField0_ |= to_bitField0_;
}
@@ -407,12 +648,25 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
public Builder mergeFrom(org.opendc.common.ProtobufMetrics.ProtoExport other) {
if (other == org.opendc.common.ProtobufMetrics.ProtoExport.getDefaultInstance()) return this;
- if (other.hasId()) {
- setId(other.getId());
+ if (other.hasTimestamp()) {
+ timestamp_ = other.timestamp_;
+ bitField0_ |= 0x00000001;
+ onChanged();
+ }
+ if (other.hasHostId()) {
+ hostId_ = other.hostId_;
+ bitField0_ |= 0x00000002;
+ onChanged();
}
if (other.hasTasksactive()) {
setTasksactive(other.getTasksactive());
}
+ if (other.hasCpuutilization()) {
+ setCpuutilization(other.getCpuutilization());
+ }
+ if (other.hasEnergyusage()) {
+ setEnergyusage(other.getEnergyusage());
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -420,12 +674,21 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
@java.lang.Override
public final boolean isInitialized() {
- if (!hasId()) {
+ if (!hasTimestamp()) {
+ return false;
+ }
+ if (!hasHostId()) {
return false;
}
if (!hasTasksactive()) {
return false;
}
+ if (!hasCpuutilization()) {
+ return false;
+ }
+ if (!hasEnergyusage()) {
+ return false;
+ }
return true;
}
@@ -445,16 +708,31 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
case 0:
done = true;
break;
- case 8: {
- id_ = input.readInt32();
+ case 10: {
+ timestamp_ = input.readBytes();
bitField0_ |= 0x00000001;
break;
- } // case 8
- case 16: {
- tasksactive_ = input.readInt32();
+ } // case 10
+ case 18: {
+ hostId_ = input.readBytes();
bitField0_ |= 0x00000002;
break;
- } // case 16
+ } // case 18
+ case 24: {
+ tasksactive_ = input.readInt32();
+ bitField0_ |= 0x00000004;
+ break;
+ } // case 24
+ case 33: {
+ cpuutilization_ = input.readDouble();
+ bitField0_ |= 0x00000008;
+ break;
+ } // case 33
+ case 41: {
+ energyusage_ = input.readDouble();
+ bitField0_ |= 0x00000010;
+ break;
+ } // case 41
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag)) {
done = true; // was an endgroup tag
@@ -472,57 +750,177 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
}
private int bitField0_;
- private int id_ ;
+ private java.lang.Object timestamp_ = "";
/**
- * <code>required int32 id = 1;</code>
- * @return Whether the id field is set.
+ * <code>required string timestamp = 1;</code>
+ * @return Whether the timestamp field is set.
*/
- @java.lang.Override
- public boolean hasId() {
+ public boolean hasTimestamp() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
- * <code>required int32 id = 1;</code>
- * @return The id.
+ * <code>required string timestamp = 1;</code>
+ * @return The timestamp.
*/
- @java.lang.Override
- public int getId() {
- return id_;
+ public java.lang.String getTimestamp() {
+ java.lang.Object ref = timestamp_;
+ if (!(ref instanceof java.lang.String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ timestamp_ = s;
+ }
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
}
/**
- * <code>required int32 id = 1;</code>
- * @param value The id to set.
+ * <code>required string timestamp = 1;</code>
+ * @return The bytes for timestamp.
+ */
+ public com.google.protobuf.ByteString
+ getTimestampBytes() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ timestamp_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>required string timestamp = 1;</code>
+ * @param value The timestamp to set.
* @return This builder for chaining.
*/
- public Builder setId(int value) {
-
- id_ = value;
+ public Builder setTimestamp(
+ java.lang.String value) {
+ if (value == null) { throw new NullPointerException(); }
+ timestamp_ = value;
bitField0_ |= 0x00000001;
onChanged();
return this;
}
/**
- * <code>required int32 id = 1;</code>
+ * <code>required string timestamp = 1;</code>
* @return This builder for chaining.
*/
- public Builder clearId() {
+ public Builder clearTimestamp() {
+ timestamp_ = getDefaultInstance().getTimestamp();
bitField0_ = (bitField0_ & ~0x00000001);
- id_ = 0;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required string timestamp = 1;</code>
+ * @param value The bytes for timestamp to set.
+ * @return This builder for chaining.
+ */
+ public Builder setTimestampBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) { throw new NullPointerException(); }
+ timestamp_ = value;
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return this;
+ }
+
+ private java.lang.Object hostId_ = "";
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return Whether the hostId field is set.
+ */
+ public boolean hasHostId() {
+ return ((bitField0_ & 0x00000002) != 0);
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The hostId.
+ */
+ public java.lang.String getHostId() {
+ java.lang.Object ref = hostId_;
+ if (!(ref instanceof java.lang.String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ hostId_ = s;
+ }
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return The bytes for hostId.
+ */
+ public com.google.protobuf.ByteString
+ getHostIdBytes() {
+ java.lang.Object ref = hostId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ hostId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @param value The hostId to set.
+ * @return This builder for chaining.
+ */
+ public Builder setHostId(
+ java.lang.String value) {
+ if (value == null) { throw new NullPointerException(); }
+ hostId_ = value;
+ bitField0_ |= 0x00000002;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearHostId() {
+ hostId_ = getDefaultInstance().getHostId();
+ bitField0_ = (bitField0_ & ~0x00000002);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required string host_id = 2;</code>
+ * @param value The bytes for hostId to set.
+ * @return This builder for chaining.
+ */
+ public Builder setHostIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) { throw new NullPointerException(); }
+ hostId_ = value;
+ bitField0_ |= 0x00000002;
onChanged();
return this;
}
private int tasksactive_ ;
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return Whether the tasksactive field is set.
*/
@java.lang.Override
public boolean hasTasksactive() {
- return ((bitField0_ & 0x00000002) != 0);
+ return ((bitField0_ & 0x00000004) != 0);
}
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return The tasksactive.
*/
@java.lang.Override
@@ -530,28 +928,108 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
return tasksactive_;
}
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @param value The tasksactive to set.
* @return This builder for chaining.
*/
public Builder setTasksactive(int value) {
tasksactive_ = value;
- bitField0_ |= 0x00000002;
+ bitField0_ |= 0x00000004;
onChanged();
return this;
}
/**
- * <code>required int32 tasksactive = 2;</code>
+ * <code>required int32 tasksactive = 3;</code>
* @return This builder for chaining.
*/
public Builder clearTasksactive() {
- bitField0_ = (bitField0_ & ~0x00000002);
+ bitField0_ = (bitField0_ & ~0x00000004);
tasksactive_ = 0;
onChanged();
return this;
}
+ private double cpuutilization_ ;
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return Whether the cpuutilization field is set.
+ */
+ @java.lang.Override
+ public boolean hasCpuutilization() {
+ return ((bitField0_ & 0x00000008) != 0);
+ }
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return The cpuutilization.
+ */
+ @java.lang.Override
+ public double getCpuutilization() {
+ return cpuutilization_;
+ }
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @param value The cpuutilization to set.
+ * @return This builder for chaining.
+ */
+ public Builder setCpuutilization(double value) {
+
+ cpuutilization_ = value;
+ bitField0_ |= 0x00000008;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required double cpuutilization = 4;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearCpuutilization() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ cpuutilization_ = 0D;
+ onChanged();
+ return this;
+ }
+
+ private double energyusage_ ;
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return Whether the energyusage field is set.
+ */
+ @java.lang.Override
+ public boolean hasEnergyusage() {
+ return ((bitField0_ & 0x00000010) != 0);
+ }
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return The energyusage.
+ */
+ @java.lang.Override
+ public double getEnergyusage() {
+ return energyusage_;
+ }
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @param value The energyusage to set.
+ * @return This builder for chaining.
+ */
+ public Builder setEnergyusage(double value) {
+
+ energyusage_ = value;
+ bitField0_ |= 0x00000010;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>required double energyusage = 5;</code>
+ * @return This builder for chaining.
+ */
+ public Builder clearEnergyusage() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ energyusage_ = 0D;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:proto.ProtoExport)
}
@@ -617,9 +1095,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\014schema.proto\022\005proto\".\n\013ProtoExport\022\n\n\002" +
- "id\030\001 \002(\005\022\023\n\013tasksactive\030\002 \002(\005B$\n\021org.ope" +
- "ndc.commonB\017ProtobufMetrics"
+ "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" +
+ "timestamp\030\001 \002(\t\022\017\n\007host_id\030\002 \002(\t\022\023\n\013task" +
+ "sactive\030\003 \002(\005\022\026\n\016cpuutilization\030\004 \002(\001\022\023\n" +
+ "\013energyusage\030\005 \002(\001B$\n\021org.opendc.commonB" +
+ "\017ProtobufMetrics"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@@ -630,7 +1110,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile {
internal_static_proto_ProtoExport_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_proto_ProtoExport_descriptor,
- new java.lang.String[] { "Id", "Tasksactive", });
+ new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", });
descriptor.resolveAllFeaturesImmutable();
}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
index e6f18da5..8261f6f0 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
@@ -13,7 +13,6 @@ import java.net.Socket
import java.sql.Connection
/**
- * @author Mateusz
* @property name
* @property backlog the amount of connections to accept
* @property address IPv4 address
@@ -22,8 +21,14 @@ import java.sql.Connection
* @property username Postgresql user
* @property password Postgresql password
* @property database Postgresql database
- * @property topic Kafka topic
+ * @property topic Kafka topic and database table name
* @property kafka Kafka port
+ * @author Mateusz
+ */
+/*
+
+ Use `by lazy` here.
+ Use design patterns - singleton.
*/
@Serializable
public data class Config(
@@ -50,6 +55,10 @@ public data class Config(
public fun setConfigSocket(socket: Socket?){
this.socket = socket
+ // no try catch if the exception is not from Java
+ // do not use raw sockets, use a service for the communication
+ // use redis instead of HTTP GET (consider it, but not bound in stone)
+ // make an API KTor
try {
input = socket?.getInputStream()
output = socket?.getOutputStream()
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
index 48590d9f..d7ccd385 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
@@ -1,41 +1,56 @@
package org.opendc.common.utils
+
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
-
import java.util.*
+import kotlin.time.Duration.Companion.microseconds
+import kotlin.time.toJavaDuration
+/**
+ * Represents the Kafka interface.
+ * @constructor `topic` the Kafka topic
+ * @author Mateusz Kwiatkowski
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a>
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
+ */
-public class Kafka (
- private val topic : String,
- address : String,
- port : Int,
-) {
- private val servers : String = "$address:$port"
- private var properties: Properties? = null
- private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null
+@Suppress("DEPRECATION")
+public class Kafka(private val topic: String) {
+ private var properties : Properties
init {
- properties = Properties()
- properties?.put("bootstrap.servers", servers)
- properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
- properties?.put("schema.registry.url", "http://localhost:8081")
- properties?.put("auto.register.schemas", "true")
+ check(!topic.contains("-"))
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(Kafka::class.java.getResource("/producer.toml"))
+ }
- try {
- producer = KafkaProducer(properties)
- } catch (e: Exception){
+ public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
+ val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun (value : ProtobufMetrics.ProtoExport) {
+ try {
+ producer.send(ProducerRecord(this.topic, value))
+ } catch (e: Exception) {
println("${e.message}")
+ }
}
}
- public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
- return this.producer
- }
-
- public fun send(value : ProtobufMetrics.ProtoExport){
- producer?.send(ProducerRecord(this.topic, value))
+ // TODO: fix
+ public fun getReceive() : () -> Unit {
+ val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun() : Unit {
+ try {
+ consumer.subscribe(listOf(topic))
+ while (true) {
+ consumer.poll(1.microseconds.toJavaDuration())
+ }
+ } catch (e: Exception) {
+ println("${e.message}")
+ }
+ }
}
-
} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
index 69314ef3..361925ee 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
@@ -1,71 +1,52 @@
package org.opendc.common.utils
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
-
+import java.util.Properties
/**
* Represents the Postgresql database.
- * On setup cleans the entire database and creates empty tables.
+ * On setup cleans the entire database.
*
- * @author Mateusz
+ * @author Mateusz Kwiatkowski
*
- * @param address ipv4 address
- * @param port postgres post
- * @param dbName database name
- * @param user
- * @param password
+ * @see <a href=https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html>
+ * https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html</a>
*/
-public class PostgresqlDB(
- address : String,
- port : Int,
- dbName : String,
- private var user : String,
- private var password : String,
-) {
+@Suppress("DEPRECATION")
+public class PostgresqlDB {
+ private var properties = Properties()
private var connection : Connection? = null
- private var dbUrl : String = ""
init {
- dbUrl = "jdbc:postgresql://$address:$port/$dbName"
- println(dbUrl)
try {
- connection = DriverManager.getConnection(dbUrl, user, password)
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(PostgresqlDB::class.java.getResource("/database.toml"))
+ connection = DriverManager.getConnection(
+ properties.getProperty("address").asJdbc(properties.getProperty("table")),
+ properties.getProperty("user"),
+ properties.getProperty("password"))
clear()
- setup()
} catch (e: SQLException) {
print("${e.message}")
}
}
- public fun setup(){
- val CREATE_TABLE = """
- CREATE TABLE metrics (
- id SERIAL PRIMARY KEY,
- timestamp bigint,
- tasksActive integer,
- clusterName varchar(10));
- """.trimIndent()
-
- try {
- val conn = DriverManager.getConnection(dbUrl, user, password)
- val st = conn.createStatement()
- st.executeQuery(CREATE_TABLE)
- } catch (e: SQLException){
- println("${e.message}")
- }
- }
-
public fun clear(){
val DELETE_ALL_TABLES = """
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
""".trimIndent()
try {
- val conn = DriverManager.getConnection(dbUrl, user, password)
- val st = conn.createStatement()
- st.executeQuery(DELETE_ALL_TABLES)
+ val st = connection?.createStatement()
+ st?.executeQuery(DELETE_ALL_TABLES)
} catch (e: SQLException){
println("${e.message}")
}
}
+
+ private fun String.asJdbc(table : String) : String {
+ return "jdbc:postgresql://$this/$table"
+ }
+
} \ No newline at end of file
diff --git a/opendc-common/src/main/resources/consumer.toml b/opendc-common/src/main/resources/consumer.toml
new file mode 100644
index 00000000..0f4a2d1d
--- /dev/null
+++ b/opendc-common/src/main/resources/consumer.toml
@@ -0,0 +1,6 @@
+"bootstrap.servers" = "127.0.0.1:9092"
+"key.deserializer" = "org.apache.kafka.common.serialization.VoidDeserializer"
+"value.deserializer" = "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"
+"group.id" = "connect-postgresql-sink"
+"enable.auto.commit" = "true"
+"auto.commit.interval.ms" = "1000"
diff --git a/opendc-common/src/main/resources/database.toml b/opendc-common/src/main/resources/database.toml
new file mode 100644
index 00000000..35e1d159
--- /dev/null
+++ b/opendc-common/src/main/resources/database.toml
@@ -0,0 +1,5 @@
+"address" = "127.0.0.1:5432"
+"username" = "matt"
+"password" = "admin"
+"database" = "opendc"
+"table" = "postgres_topic" \ No newline at end of file
diff --git a/opendc-common/src/main/resources/log4j2.xml b/opendc-common/src/main/resources/log4j2.xml
index 07389360..d79ec204 100644
--- a/opendc-common/src/main/resources/log4j2.xml
+++ b/opendc-common/src/main/resources/log4j2.xml
@@ -29,6 +29,7 @@
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
</Console>
</Appenders>
+
<Loggers>
<Logger name="org.opendc" level="warn" additivity="false">
<AppenderRef ref="Console"/>
diff --git a/opendc-common/src/main/resources/producer.toml b/opendc-common/src/main/resources/producer.toml
new file mode 100644
index 00000000..33a09284
--- /dev/null
+++ b/opendc-common/src/main/resources/producer.toml
@@ -0,0 +1,5 @@
+"bootstrap.servers" = "127.0.0.1:9092"
+"key.serializer" = "org.apache.kafka.common.serialization.VoidSerializer"
+"value.serializer" = "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"
+"schema.registry.url" = "http://localhost:8081"
+"auto.register.schemas" = "true" \ No newline at end of file
diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto
new file mode 100644
index 00000000..d0aa18d5
--- /dev/null
+++ b/opendc-common/src/main/resources/schema.proto
@@ -0,0 +1,14 @@
+syntax = "proto3";
+
+package proto;
+
+option java_package = "org.opendc.common";
+option java_outer_classname = "ProtobufMetrics";
+
+message ProtoExport {
+ string timestamp = 1;
+ string host_id = 2;
+ int32 tasksactive = 3;
+ double cpuutilization = 4;
+ double energyusage = 5;
+} \ No newline at end of file
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
index bf8b7825..a4c4209c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt
@@ -48,13 +48,14 @@ public class SmartScheduler() : ComputeScheduler {
// You need to specify how much time do you have to make the prediction between receiving a time and putting onto a host
return SchedulingResult(SchedulingResultType.EMPTY)
}
+
// Benefits of a digital twin: during operations you make sure what is happening in the real world.
// The use-case is making split-second automated decisions before operators can make them.
// Make a strong case for making a Digital Twin.
override fun removeTask(
task: ServiceTask,
- host: HostView?
+ host: HostView?,
) {
TODO("Not yet implemented")
}
-} \ No newline at end of file
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
index d0be10db..5dfa21c5 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt
@@ -1,50 +1,57 @@
-package org.opendc.compute.simulator.telemetry
+/*
+ * Copyright (c) 2026 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package org.opendc.compute.simulator.telemetry
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.opendc.common.utils.Config
-import org.opendc.common.utils.Kafka
import org.opendc.common.ProtobufMetrics
-
+import org.opendc.common.utils.Kafka
import org.opendc.compute.simulator.telemetry.table.host.HostTableReader
/**
- * @author Mateusz
* This class logs data from the simulator into Kafka.
+ * The data uses the Protobuf format.
+ *
+ * @author Mateusz Kwiatkowski
+ *
+ * @see <a href=https://protobuf.dev/getting-started/javatutorial/>
+ * https://protobuf.dev/getting-started/javatutorial/</a>
*/
-public class KafkaComputeMonitor: ComputeMonitor {
- private val metrics : MonitoringMetrics = MonitoringMetrics()
- private val kafka : Kafka? = Config.getKafkaInstance()
+public class KafkaComputeMonitor : ComputeMonitor {
+ private val send = Kafka("postgres_topic").getSend()
@Override
override fun record(reader: HostTableReader) {
- metrics.id += 1
- metrics.timestamp = reader.timestamp.toEpochMilli()
- metrics.tasksActive = reader.tasksActive
- metrics.clusterName = reader.hostInfo.clusterName
-
- try{
- val packet = ProtobufMetrics.ProtoExport.newBuilder()
- .setId(metrics.id)
- .setTasksactive(metrics.tasksActive)
- .build()
- kafka?.send(packet)
- }
-
- catch(e: Exception){
+ try {
+ val packet =
+ ProtobufMetrics.ProtoExport.newBuilder()
+ .setTimestamp(reader.timestamp.toEpochMilli().toString())
+ .setHostId(reader.hostInfo.name)
+ .setTasksactive(reader.tasksActive)
+ .setCpuutilization(reader.cpuUtilization)
+ .setEnergyusage(reader.energyUsage)
+ .build()
+ this.send(packet)
+ } catch (e: Exception) {
println("${e.message}")
}
}
-
-}
-
-/**
- * @author Mateusz
- * This serves as editable data class for ObjectMapper().
- */
-public class MonitoringMetrics {
- public var id: Int = 0
- public var timestamp: Long = 0
- public var tasksActive : Int = 0
- public var clusterName: String = ""
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
index 96071833..78ce6158 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt
@@ -30,7 +30,6 @@ import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.file
import org.opendc.common.utils.Config
import org.opendc.common.utils.ConfigReader
-import org.opendc.common.utils.Kafka
import org.opendc.common.utils.PostgresqlDB
import org.opendc.experiments.base.experiment.getExperiment
import java.io.File
@@ -47,11 +46,9 @@ public fun main(args: Array<String>) {
else ExperimentListener().main(args)
}
-
-
/**
- * @author Mateusz
* Opens a client socket from `config`, but otherwise works as before.
+ * @author Mateusz
*/
internal class ExperimentCommand : CliktCommand(name = "experiment") {
private val configPath by option("--config-path", help = "path to config file")
@@ -68,9 +65,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
var clientSocket : Socket? = null
try {
- clientSocket = Socket(config.address, config.port)
Config.setConfigSocket(clientSocket)
- Config.setKafkaInstance(Kafka(config.topic, config.address, config.kafka))
val experiment = getExperiment(experimentPath)
runExperiment(experiment)
@@ -84,8 +79,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") {
}
/**
- * @author Mateusz
* Creates a server socket and database connection from `config`.
+ * @author Mateusz
*/
internal class ExperimentListener: CliktCommand(name = "listener") {
private val configPath by option("--config-path", help = "path to config file")
@@ -96,9 +91,8 @@ internal class ExperimentListener: CliktCommand(name = "listener") {
val configReader = ConfigReader()
var serverSocket: ServerSocket? = null
val config = configReader.read(configPath)
- Config.setDB(PostgresqlDB(config.address, config.postgresql, config.database, config.username, config.password))
-
try {
+
val inetAddress = InetAddress.getByName(config.address)
serverSocket = ServerSocket(config.port, config.backlog, inetAddress)
runListener(serverSocket)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
index 9fee6cf9..3867a9f0 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt
@@ -25,11 +25,11 @@ package org.opendc.experiments.base.runner
import me.tongfei.progressbar.ProgressBarBuilder
import me.tongfei.progressbar.ProgressBarStyle
import org.opendc.common.utils.Config
+import org.opendc.common.utils.PostgresqlDB
import org.opendc.experiments.base.experiment.Scenario
import java.io.IOException
import java.net.ServerSocket
import java.net.Socket
-import org.opendc.demo.runRequest
/**
* Run scenario when no pool is available for parallel execution
@@ -71,16 +71,19 @@ public fun runExperiment(experiment: List<Scenario>) {
*/
public fun runListener(socket: ServerSocket) {
var client : Socket? = null
+ val db = PostgresqlDB()
try {
client = socket.accept()
Config.setConfigSocket(client)
- val request = ByteArray(1024)
+ // val request = ByteArray(1024)
while(true){
+/*
val ret : Int? = Config.getConfigReader()?.read(request)
if(ret == -1) break
if(ret != null && ret > 0) runRequest(String(request, 0, ret))
- }
+
+ */ }
} catch (e: IOException) {
println("${e.message}")
diff --git a/output/greenifier-demo-scaling/trackr.json b/output/greenifier-demo-scaling/trackr.json
index f5113dfd..a595beff 100644
--- a/output/greenifier-demo-scaling/trackr.json
+++ b/output/greenifier-demo-scaling/trackr.json
@@ -17,44 +17,4 @@
"service": true
}
}
-},
-{
- "name": "greenifier-demo-scaling",
- "topology": {
- "pathToFile": "resources/topologies/surf_medium.json"
- },
- "workload": {
- "pathToFile": "resources/workloads/surf_month",
- "type": "ComputeWorkload"
- },
- "exportModel": {
- "exportInterval": 3600,
- "filesToExportDict": {
- "host": true,
- "task": true,
- "powerSource": true,
- "battery": true,
- "service": true
- }
- }
-},
-{
- "name": "greenifier-demo-scaling",
- "topology": {
- "pathToFile": "resources/topologies/surf_small.json"
- },
- "workload": {
- "pathToFile": "resources/workloads/surf_month",
- "type": "ComputeWorkload"
- },
- "exportModel": {
- "exportInterval": 3600,
- "filesToExportDict": {
- "host": true,
- "task": true,
- "powerSource": true,
- "battery": true,
- "service": true
- }
- }
}] \ No newline at end of file
diff --git a/resources/experiments/config.json b/resources/experiments/config.json
index 9527923d..ae383f45 100644
--- a/resources/experiments/config.json
+++ b/resources/experiments/config.json
@@ -7,6 +7,7 @@
"username" : "matt",
"password" : "admin",
"database" : "opendc",
- "topic" : "postgres-topic",
+ "topic" : "postgres_topic",
"kafka" : "9092"
}
+
diff --git a/resources/experiments/config.toml b/resources/experiments/config.toml
new file mode 100644
index 00000000..b9ff2b38
--- /dev/null
+++ b/resources/experiments/config.toml
@@ -0,0 +1,11 @@
+// use a log4j to log your results in a form of a log
+
+
+// Use check instead of require
+// use type-aliases
+// use higher-order
+// annotation classes
+// operator objects - Factory Design Pattern
+// if it can be modeled as an instance then you do not need functions
+// make your kotlin program write to standalone
+// latex files that create figures themselves instead of using python
diff --git a/resources/experiments/experiment_scaling.json b/resources/experiments/experiment_scaling.json
index 1051389d..ce0b917c 100644
--- a/resources/experiments/experiment_scaling.json
+++ b/resources/experiments/experiment_scaling.json
@@ -3,13 +3,8 @@
"topologies": [
{
"pathToFile": "resources/topologies/surf.json"
- },
- {
- "pathToFile": "resources/topologies/surf_medium.json"
- },
- {
- "pathToFile": "resources/topologies/surf_small.json"
- }],
+ }
+ ],
"workloads": [
{
"pathToFile": "resources/workloads/surf_month",
diff --git a/resources/experiments/schema.proto b/resources/experiments/schema.proto
deleted file mode 100644
index 2a308edd..00000000
--- a/resources/experiments/schema.proto
+++ /dev/null
@@ -1,11 +0,0 @@
-syntax = "proto2";
-
-package proto;
-
-option java_package = "org.opendc.common";
-option java_outer_classname = "ProtobufMetrics";
-
-message ProtoExport {
- required int32 id = 1;
- required int32 tasksactive = 2;
-} \ No newline at end of file
diff --git a/resources/experiments/sink-jdbc.properties b/resources/experiments/sink-jdbc.properties
index 4a78b2ed..a36dad31 100644
--- a/resources/experiments/sink-jdbc.properties
+++ b/resources/experiments/sink-jdbc.properties
@@ -25,7 +25,7 @@ value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
# The topics to consume from - required for sink connectors like this one
-topics=postgres-topic
+topics=postgres_topic
# Configuration specific to the JDBC sink connector.
# We want to connect to a Postgres database stored in the file test.db and auto-create tables.
@@ -34,6 +34,7 @@ connection.url=jdbc:postgresql://127.0.0.1:5432/opendc
connection.user=matt
connection.password=admin
auto.create=true
+auto.evolve=true
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
@@ -43,4 +44,4 @@ auto.create=true
# Here are some values that enable JSON formatted files to be ingested by Postgresql
insert.mode=insert
-
+table.name.format=postgres_topic \ No newline at end of file
diff --git a/resources/topologies/surf_small_solution.json b/resources/topologies/surf_small_solution.json
deleted file mode 100644
index 8c3d8ce3..00000000
--- a/resources/topologies/surf_small_solution.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
- "clusters":
- [
- {
- "name": "C01",
- "hosts" :
- [
- {
- "name": "H01",
- "cpu": {
- "coreCount": 16,
- "coreSpeed": 2100
- },
- "memory": {
- "memorySize": 128000000
- },
- "count": 100
- }
- ],
- "powerSource": {
- "carbonTracePath": "resources/carbon_traces/NL_2021-2024.parquet"
- }
- }
- ]
-} \ No newline at end of file