diff options
| author | mjkwiatkowski <mati.rewa@gmail.com> | 2026-02-20 16:17:39 +0100 |
|---|---|---|
| committer | mjkwiatkowski <mati.rewa@gmail.com> | 2026-02-20 16:17:39 +0100 |
| commit | f5da60e4275ca1172128c3994298691e12d5e1f8 (patch) | |
| tree | 189804251bf88bf390e1c9ffb4472b7a798d7f22 | |
| parent | 2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (diff) | |
22 files changed, 747 insertions, 290 deletions
@@ -1,6 +1,8 @@ ### Dependencies Paths are hardcoded. +Be aware the project will only work with this OpenDC as there are hidden Maven dependencies. +Kafka topic should be named `postgres_topic`. Confluent local (see https://www.confluent.io/installation/): @@ -8,10 +10,9 @@ Confluent local (see https://www.confluent.io/installation/): export CONFLUENT_HOME=/opt/confluent export PATH=/opt/confluent/bin:$PATH cd /opt/confluent -kafka-storage.sh random-uuid -kafka-storage.sh format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c etc/kafka/server.properties -kafka-server-start.sh etc/kafka/server.properties -kafka-server-start etc/kafka/server.properties +kafka-storage random-uuid +kafka-storage format -t 2vi2WtHxQAOPyXb1Bj1Jvw -c $CONFLUENT_HOME/etc/kafka/server.properties --standalone +kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/share/confluent-common/connectors/sink-jdbc.properties ``` @@ -20,8 +21,8 @@ Confluent JDBC sink and source (includes Postgres connector) (see https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc) Be mindful to configure the right `plugin.path` in `etc/kafka/connect-standalone.properties` ```bash -ln -s $HOME/git/opendc/resources/experiments/sink-jdbc.properties opt/confluent/share/confluent-common/connectors/sink-jdbc.properties -``` +ln -s /home/matt/git/opendc/resources/experiments/sink-jdbc.properties /opt/confluent/share/confluent-common/connectors +``` Protobuf: @@ -31,6 +32,7 @@ You need to run this each time you change `schema.proto` ```bash cd resources/experiments/ protoc --java_out=/home/matt/git/opendc/opendc-common/src/main/java/ schema.proto +curl -X DELETE http://localhost:8081/subjects/postgres-topic-value ``` Postgresql: @@ -45,11 +47,10 @@ touch .s.PGSQL.5432.lock chown -R postgres:postgres /run/postgresql ``` - Random ```bash -bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres-topic -bin/kafka-topics.sh --create --topic postgres-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 +bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic postgres_topic +bin/kafka-topics.sh --create --topic postgres_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --list --bootstrap-server localhost:9092 -bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres-topic --from-beginning -```
\ No newline at end of file +bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic postgres_topic --from-beginning +``` diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts index 4d9e8b54..0c87303c 100644 --- a/opendc-common/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -62,6 +62,9 @@ dependencies { // Source: https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer implementation("io.confluent:kafka-protobuf-serializer:8.1.1") + + // Source: https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-toml + implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-toml:2.21.0") } diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java index 0ec97cd0..7f7a3874 100644 --- a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java +++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java @@ -31,26 +31,71 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { com.google.protobuf.MessageOrBuilder { /** - * <code>required int32 id = 1;</code> - * @return Whether the id field is set. + * <code>required string timestamp = 1;</code> + * @return Whether the timestamp field is set. */ - boolean hasId(); + boolean hasTimestamp(); /** - * <code>required int32 id = 1;</code> - * @return The id. + * <code>required string timestamp = 1;</code> + * @return The timestamp. */ - int getId(); + java.lang.String getTimestamp(); + /** + * <code>required string timestamp = 1;</code> + * @return The bytes for timestamp. + */ + com.google.protobuf.ByteString + getTimestampBytes(); /** - * <code>required int32 tasksactive = 2;</code> + * <code>required string host_id = 2;</code> + * @return Whether the hostId field is set. + */ + boolean hasHostId(); + /** + * <code>required string host_id = 2;</code> + * @return The hostId. + */ + java.lang.String getHostId(); + /** + * <code>required string host_id = 2;</code> + * @return The bytes for hostId. + */ + com.google.protobuf.ByteString + getHostIdBytes(); + + /** + * <code>required int32 tasksactive = 3;</code> * @return Whether the tasksactive field is set. */ boolean hasTasksactive(); /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return The tasksactive. */ int getTasksactive(); + + /** + * <code>required double cpuutilization = 4;</code> + * @return Whether the cpuutilization field is set. + */ + boolean hasCpuutilization(); + /** + * <code>required double cpuutilization = 4;</code> + * @return The cpuutilization. + */ + double getCpuutilization(); + + /** + * <code>required double energyusage = 5;</code> + * @return Whether the energyusage field is set. + */ + boolean hasEnergyusage(); + /** + * <code>required double energyusage = 5;</code> + * @return The energyusage. + */ + double getEnergyusage(); } /** * Protobuf type {@code proto.ProtoExport} @@ -74,6 +119,8 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { super(builder); } private ProtoExport() { + timestamp_ = ""; + hostId_ = ""; } public static final com.google.protobuf.Descriptors.Descriptor @@ -90,37 +137,116 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { } private int bitField0_; - public static final int ID_FIELD_NUMBER = 1; - private int id_ = 0; + public static final int TIMESTAMP_FIELD_NUMBER = 1; + @SuppressWarnings("serial") + private volatile java.lang.Object timestamp_ = ""; /** - * <code>required int32 id = 1;</code> - * @return Whether the id field is set. + * <code>required string timestamp = 1;</code> + * @return Whether the timestamp field is set. */ @java.lang.Override - public boolean hasId() { + public boolean hasTimestamp() { return ((bitField0_ & 0x00000001) != 0); } /** - * <code>required int32 id = 1;</code> - * @return The id. + * <code>required string timestamp = 1;</code> + * @return The timestamp. + */ + @java.lang.Override + public java.lang.String getTimestamp() { + java.lang.Object ref = timestamp_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + timestamp_ = s; + } + return s; + } + } + /** + * <code>required string timestamp = 1;</code> + * @return The bytes for timestamp. + */ + @java.lang.Override + public com.google.protobuf.ByteString + getTimestampBytes() { + java.lang.Object ref = timestamp_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timestamp_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int HOST_ID_FIELD_NUMBER = 2; + @SuppressWarnings("serial") + private volatile java.lang.Object hostId_ = ""; + /** + * <code>required string host_id = 2;</code> + * @return Whether the hostId field is set. + */ + @java.lang.Override + public boolean hasHostId() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * <code>required string host_id = 2;</code> + * @return The hostId. + */ + @java.lang.Override + public java.lang.String getHostId() { + java.lang.Object ref = hostId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + hostId_ = s; + } + return s; + } + } + /** + * <code>required string host_id = 2;</code> + * @return The bytes for hostId. */ @java.lang.Override - public int getId() { - return id_; + public com.google.protobuf.ByteString + getHostIdBytes() { + java.lang.Object ref = hostId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + hostId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } } - public static final int TASKSACTIVE_FIELD_NUMBER = 2; + public static final int TASKSACTIVE_FIELD_NUMBER = 3; private int tasksactive_ = 0; /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return Whether the tasksactive field is set. */ @java.lang.Override public boolean hasTasksactive() { - return ((bitField0_ & 0x00000002) != 0); + return ((bitField0_ & 0x00000004) != 0); } /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return The tasksactive. */ @java.lang.Override @@ -128,6 +254,44 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return tasksactive_; } + public static final int CPUUTILIZATION_FIELD_NUMBER = 4; + private double cpuutilization_ = 0D; + /** + * <code>required double cpuutilization = 4;</code> + * @return Whether the cpuutilization field is set. + */ + @java.lang.Override + public boolean hasCpuutilization() { + return ((bitField0_ & 0x00000008) != 0); + } + /** + * <code>required double cpuutilization = 4;</code> + * @return The cpuutilization. + */ + @java.lang.Override + public double getCpuutilization() { + return cpuutilization_; + } + + public static final int ENERGYUSAGE_FIELD_NUMBER = 5; + private double energyusage_ = 0D; + /** + * <code>required double energyusage = 5;</code> + * @return Whether the energyusage field is set. + */ + @java.lang.Override + public boolean hasEnergyusage() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * <code>required double energyusage = 5;</code> + * @return The energyusage. + */ + @java.lang.Override + public double getEnergyusage() { + return energyusage_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -135,7 +299,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { if (isInitialized == 1) return true; if (isInitialized == 0) return false; - if (!hasId()) { + if (!hasTimestamp()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasHostId()) { memoizedIsInitialized = 0; return false; } @@ -143,6 +311,14 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { memoizedIsInitialized = 0; return false; } + if (!hasCpuutilization()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasEnergyusage()) { + memoizedIsInitialized = 0; + return false; + } memoizedIsInitialized = 1; return true; } @@ -151,10 +327,19 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { if (((bitField0_ & 0x00000001) != 0)) { - output.writeInt32(1, id_); + com.google.protobuf.GeneratedMessage.writeString(output, 1, timestamp_); } if (((bitField0_ & 0x00000002) != 0)) { - output.writeInt32(2, tasksactive_); + com.google.protobuf.GeneratedMessage.writeString(output, 2, hostId_); + } + if (((bitField0_ & 0x00000004) != 0)) { + output.writeInt32(3, tasksactive_); + } + if (((bitField0_ & 0x00000008) != 0)) { + output.writeDouble(4, cpuutilization_); + } + if (((bitField0_ & 0x00000010) != 0)) { + output.writeDouble(5, energyusage_); } getUnknownFields().writeTo(output); } @@ -166,12 +351,22 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { size = 0; if (((bitField0_ & 0x00000001) != 0)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(1, id_); + size += com.google.protobuf.GeneratedMessage.computeStringSize(1, timestamp_); } if (((bitField0_ & 0x00000002) != 0)) { + size += com.google.protobuf.GeneratedMessage.computeStringSize(2, hostId_); + } + if (((bitField0_ & 0x00000004) != 0)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, tasksactive_); + } + if (((bitField0_ & 0x00000008) != 0)) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(2, tasksactive_); + .computeDoubleSize(4, cpuutilization_); + } + if (((bitField0_ & 0x00000010) != 0)) { + size += com.google.protobuf.CodedOutputStream + .computeDoubleSize(5, energyusage_); } size += getUnknownFields().getSerializedSize(); memoizedSize = size; @@ -188,16 +383,33 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { } org.opendc.common.ProtobufMetrics.ProtoExport other = (org.opendc.common.ProtobufMetrics.ProtoExport) obj; - if (hasId() != other.hasId()) return false; - if (hasId()) { - if (getId() - != other.getId()) return false; + if (hasTimestamp() != other.hasTimestamp()) return false; + if (hasTimestamp()) { + if (!getTimestamp() + .equals(other.getTimestamp())) return false; + } + if (hasHostId() != other.hasHostId()) return false; + if (hasHostId()) { + if (!getHostId() + .equals(other.getHostId())) return false; } if (hasTasksactive() != other.hasTasksactive()) return false; if (hasTasksactive()) { if (getTasksactive() != other.getTasksactive()) return false; } + if (hasCpuutilization() != other.hasCpuutilization()) return false; + if (hasCpuutilization()) { + if (java.lang.Double.doubleToLongBits(getCpuutilization()) + != java.lang.Double.doubleToLongBits( + other.getCpuutilization())) return false; + } + if (hasEnergyusage() != other.hasEnergyusage()) return false; + if (hasEnergyusage()) { + if (java.lang.Double.doubleToLongBits(getEnergyusage()) + != java.lang.Double.doubleToLongBits( + other.getEnergyusage())) return false; + } if (!getUnknownFields().equals(other.getUnknownFields())) return false; return true; } @@ -209,14 +421,28 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { } int hash = 41; hash = (19 * hash) + getDescriptor().hashCode(); - if (hasId()) { - hash = (37 * hash) + ID_FIELD_NUMBER; - hash = (53 * hash) + getId(); + if (hasTimestamp()) { + hash = (37 * hash) + TIMESTAMP_FIELD_NUMBER; + hash = (53 * hash) + getTimestamp().hashCode(); + } + if (hasHostId()) { + hash = (37 * hash) + HOST_ID_FIELD_NUMBER; + hash = (53 * hash) + getHostId().hashCode(); } if (hasTasksactive()) { hash = (37 * hash) + TASKSACTIVE_FIELD_NUMBER; hash = (53 * hash) + getTasksactive(); } + if (hasCpuutilization()) { + hash = (37 * hash) + CPUUTILIZATION_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getCpuutilization())); + } + if (hasEnergyusage()) { + hash = (37 * hash) + ENERGYUSAGE_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashLong( + java.lang.Double.doubleToLongBits(getEnergyusage())); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -348,8 +574,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { public Builder clear() { super.clear(); bitField0_ = 0; - id_ = 0; + timestamp_ = ""; + hostId_ = ""; tasksactive_ = 0; + cpuutilization_ = 0D; + energyusage_ = 0D; return this; } @@ -385,13 +614,25 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (((from_bitField0_ & 0x00000001) != 0)) { - result.id_ = id_; + result.timestamp_ = timestamp_; to_bitField0_ |= 0x00000001; } if (((from_bitField0_ & 0x00000002) != 0)) { - result.tasksactive_ = tasksactive_; + result.hostId_ = hostId_; to_bitField0_ |= 0x00000002; } + if (((from_bitField0_ & 0x00000004) != 0)) { + result.tasksactive_ = tasksactive_; + to_bitField0_ |= 0x00000004; + } + if (((from_bitField0_ & 0x00000008) != 0)) { + result.cpuutilization_ = cpuutilization_; + to_bitField0_ |= 0x00000008; + } + if (((from_bitField0_ & 0x00000010) != 0)) { + result.energyusage_ = energyusage_; + to_bitField0_ |= 0x00000010; + } result.bitField0_ |= to_bitField0_; } @@ -407,12 +648,25 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { public Builder mergeFrom(org.opendc.common.ProtobufMetrics.ProtoExport other) { if (other == org.opendc.common.ProtobufMetrics.ProtoExport.getDefaultInstance()) return this; - if (other.hasId()) { - setId(other.getId()); + if (other.hasTimestamp()) { + timestamp_ = other.timestamp_; + bitField0_ |= 0x00000001; + onChanged(); + } + if (other.hasHostId()) { + hostId_ = other.hostId_; + bitField0_ |= 0x00000002; + onChanged(); } if (other.hasTasksactive()) { setTasksactive(other.getTasksactive()); } + if (other.hasCpuutilization()) { + setCpuutilization(other.getCpuutilization()); + } + if (other.hasEnergyusage()) { + setEnergyusage(other.getEnergyusage()); + } this.mergeUnknownFields(other.getUnknownFields()); onChanged(); return this; @@ -420,12 +674,21 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { @java.lang.Override public final boolean isInitialized() { - if (!hasId()) { + if (!hasTimestamp()) { + return false; + } + if (!hasHostId()) { return false; } if (!hasTasksactive()) { return false; } + if (!hasCpuutilization()) { + return false; + } + if (!hasEnergyusage()) { + return false; + } return true; } @@ -445,16 +708,31 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { case 0: done = true; break; - case 8: { - id_ = input.readInt32(); + case 10: { + timestamp_ = input.readBytes(); bitField0_ |= 0x00000001; break; - } // case 8 - case 16: { - tasksactive_ = input.readInt32(); + } // case 10 + case 18: { + hostId_ = input.readBytes(); bitField0_ |= 0x00000002; break; - } // case 16 + } // case 18 + case 24: { + tasksactive_ = input.readInt32(); + bitField0_ |= 0x00000004; + break; + } // case 24 + case 33: { + cpuutilization_ = input.readDouble(); + bitField0_ |= 0x00000008; + break; + } // case 33 + case 41: { + energyusage_ = input.readDouble(); + bitField0_ |= 0x00000010; + break; + } // case 41 default: { if (!super.parseUnknownField(input, extensionRegistry, tag)) { done = true; // was an endgroup tag @@ -472,57 +750,177 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { } private int bitField0_; - private int id_ ; + private java.lang.Object timestamp_ = ""; /** - * <code>required int32 id = 1;</code> - * @return Whether the id field is set. + * <code>required string timestamp = 1;</code> + * @return Whether the timestamp field is set. */ - @java.lang.Override - public boolean hasId() { + public boolean hasTimestamp() { return ((bitField0_ & 0x00000001) != 0); } /** - * <code>required int32 id = 1;</code> - * @return The id. + * <code>required string timestamp = 1;</code> + * @return The timestamp. */ - @java.lang.Override - public int getId() { - return id_; + public java.lang.String getTimestamp() { + java.lang.Object ref = timestamp_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + timestamp_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } } /** - * <code>required int32 id = 1;</code> - * @param value The id to set. + * <code>required string timestamp = 1;</code> + * @return The bytes for timestamp. + */ + public com.google.protobuf.ByteString + getTimestampBytes() { + java.lang.Object ref = timestamp_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timestamp_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>required string timestamp = 1;</code> + * @param value The timestamp to set. * @return This builder for chaining. */ - public Builder setId(int value) { - - id_ = value; + public Builder setTimestamp( + java.lang.String value) { + if (value == null) { throw new NullPointerException(); } + timestamp_ = value; bitField0_ |= 0x00000001; onChanged(); return this; } /** - * <code>required int32 id = 1;</code> + * <code>required string timestamp = 1;</code> * @return This builder for chaining. */ - public Builder clearId() { + public Builder clearTimestamp() { + timestamp_ = getDefaultInstance().getTimestamp(); bitField0_ = (bitField0_ & ~0x00000001); - id_ = 0; + onChanged(); + return this; + } + /** + * <code>required string timestamp = 1;</code> + * @param value The bytes for timestamp to set. + * @return This builder for chaining. + */ + public Builder setTimestampBytes( + com.google.protobuf.ByteString value) { + if (value == null) { throw new NullPointerException(); } + timestamp_ = value; + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + + private java.lang.Object hostId_ = ""; + /** + * <code>required string host_id = 2;</code> + * @return Whether the hostId field is set. + */ + public boolean hasHostId() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * <code>required string host_id = 2;</code> + * @return The hostId. + */ + public java.lang.String getHostId() { + java.lang.Object ref = hostId_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + hostId_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>required string host_id = 2;</code> + * @return The bytes for hostId. + */ + public com.google.protobuf.ByteString + getHostIdBytes() { + java.lang.Object ref = hostId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + hostId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>required string host_id = 2;</code> + * @param value The hostId to set. + * @return This builder for chaining. + */ + public Builder setHostId( + java.lang.String value) { + if (value == null) { throw new NullPointerException(); } + hostId_ = value; + bitField0_ |= 0x00000002; + onChanged(); + return this; + } + /** + * <code>required string host_id = 2;</code> + * @return This builder for chaining. + */ + public Builder clearHostId() { + hostId_ = getDefaultInstance().getHostId(); + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + return this; + } + /** + * <code>required string host_id = 2;</code> + * @param value The bytes for hostId to set. + * @return This builder for chaining. + */ + public Builder setHostIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { throw new NullPointerException(); } + hostId_ = value; + bitField0_ |= 0x00000002; onChanged(); return this; } private int tasksactive_ ; /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return Whether the tasksactive field is set. */ @java.lang.Override public boolean hasTasksactive() { - return ((bitField0_ & 0x00000002) != 0); + return ((bitField0_ & 0x00000004) != 0); } /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return The tasksactive. */ @java.lang.Override @@ -530,28 +928,108 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { return tasksactive_; } /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @param value The tasksactive to set. * @return This builder for chaining. */ public Builder setTasksactive(int value) { tasksactive_ = value; - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000004; onChanged(); return this; } /** - * <code>required int32 tasksactive = 2;</code> + * <code>required int32 tasksactive = 3;</code> * @return This builder for chaining. */ public Builder clearTasksactive() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000004); tasksactive_ = 0; onChanged(); return this; } + private double cpuutilization_ ; + /** + * <code>required double cpuutilization = 4;</code> + * @return Whether the cpuutilization field is set. + */ + @java.lang.Override + public boolean hasCpuutilization() { + return ((bitField0_ & 0x00000008) != 0); + } + /** + * <code>required double cpuutilization = 4;</code> + * @return The cpuutilization. + */ + @java.lang.Override + public double getCpuutilization() { + return cpuutilization_; + } + /** + * <code>required double cpuutilization = 4;</code> + * @param value The cpuutilization to set. + * @return This builder for chaining. + */ + public Builder setCpuutilization(double value) { + + cpuutilization_ = value; + bitField0_ |= 0x00000008; + onChanged(); + return this; + } + /** + * <code>required double cpuutilization = 4;</code> + * @return This builder for chaining. + */ + public Builder clearCpuutilization() { + bitField0_ = (bitField0_ & ~0x00000008); + cpuutilization_ = 0D; + onChanged(); + return this; + } + + private double energyusage_ ; + /** + * <code>required double energyusage = 5;</code> + * @return Whether the energyusage field is set. + */ + @java.lang.Override + public boolean hasEnergyusage() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * <code>required double energyusage = 5;</code> + * @return The energyusage. + */ + @java.lang.Override + public double getEnergyusage() { + return energyusage_; + } + /** + * <code>required double energyusage = 5;</code> + * @param value The energyusage to set. + * @return This builder for chaining. + */ + public Builder setEnergyusage(double value) { + + energyusage_ = value; + bitField0_ |= 0x00000010; + onChanged(); + return this; + } + /** + * <code>required double energyusage = 5;</code> + * @return This builder for chaining. + */ + public Builder clearEnergyusage() { + bitField0_ = (bitField0_ & ~0x00000010); + energyusage_ = 0D; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:proto.ProtoExport) } @@ -617,9 +1095,11 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { descriptor; static { java.lang.String[] descriptorData = { - "\n\014schema.proto\022\005proto\".\n\013ProtoExport\022\n\n\002" + - "id\030\001 \002(\005\022\023\n\013tasksactive\030\002 \002(\005B$\n\021org.ope" + - "ndc.commonB\017ProtobufMetrics" + "\n\014schema.proto\022\005proto\"s\n\013ProtoExport\022\021\n\t" + + "timestamp\030\001 \002(\t\022\017\n\007host_id\030\002 \002(\t\022\023\n\013task" + + "sactive\030\003 \002(\005\022\026\n\016cpuutilization\030\004 \002(\001\022\023\n" + + "\013energyusage\030\005 \002(\001B$\n\021org.opendc.commonB" + + "\017ProtobufMetrics" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -630,7 +1110,7 @@ public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { internal_static_proto_ProtoExport_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_proto_ProtoExport_descriptor, - new java.lang.String[] { "Id", "Tasksactive", }); + new java.lang.String[] { "Timestamp", "HostId", "Tasksactive", "Cpuutilization", "Energyusage", }); descriptor.resolveAllFeaturesImmutable(); } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt index e6f18da5..8261f6f0 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt @@ -13,7 +13,6 @@ import java.net.Socket import java.sql.Connection /** - * @author Mateusz * @property name * @property backlog the amount of connections to accept * @property address IPv4 address @@ -22,8 +21,14 @@ import java.sql.Connection * @property username Postgresql user * @property password Postgresql password * @property database Postgresql database - * @property topic Kafka topic + * @property topic Kafka topic and database table name * @property kafka Kafka port + * @author Mateusz + */ +/* + + Use `by lazy` here. + Use design patterns - singleton. */ @Serializable public data class Config( @@ -50,6 +55,10 @@ public data class Config( public fun setConfigSocket(socket: Socket?){ this.socket = socket + // no try catch if the exception is not from Java + // do not use raw sockets, use a service for the communication + // use redis instead of HTTP GET (consider it, but not bound in stone) + // make an API KTor try { input = socket?.getInputStream() output = socket?.getOutputStream() diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt index 48590d9f..d7ccd385 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -1,41 +1,56 @@ package org.opendc.common.utils + +import com.fasterxml.jackson.dataformat.toml.TomlMapper +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics - import java.util.* +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.toJavaDuration +/** + * Represents the Kafka interface. + * @constructor `topic` the Kafka topic + * @author Mateusz Kwiatkowski + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a> + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a> + */ -public class Kafka ( - private val topic : String, - address : String, - port : Int, -) { - private val servers : String = "$address:$port" - private var properties: Properties? = null - private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null +@Suppress("DEPRECATION") +public class Kafka(private val topic: String) { + private var properties : Properties init { - properties = Properties() - properties?.put("bootstrap.servers", servers) - properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") - properties?.put("schema.registry.url", "http://localhost:8081") - properties?.put("auto.register.schemas", "true") + check(!topic.contains("-")) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(Kafka::class.java.getResource("/producer.toml")) + } - try { - producer = KafkaProducer(properties) - } catch (e: Exception){ + public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit { + val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties) + return fun (value : ProtobufMetrics.ProtoExport) { + try { + producer.send(ProducerRecord(this.topic, value)) + } catch (e: Exception) { println("${e.message}") + } } } - public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? { - return this.producer - } - - public fun send(value : ProtobufMetrics.ProtoExport){ - producer?.send(ProducerRecord(this.topic, value)) + // TODO: fix + public fun getReceive() : () -> Unit { + val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties) + return fun() : Unit { + try { + consumer.subscribe(listOf(topic)) + while (true) { + consumer.poll(1.microseconds.toJavaDuration()) + } + } catch (e: Exception) { + println("${e.message}") + } + } } - }
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt index 69314ef3..361925ee 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -1,71 +1,52 @@ package org.opendc.common.utils +import com.fasterxml.jackson.dataformat.toml.TomlMapper import java.sql.Connection import java.sql.DriverManager import java.sql.SQLException - +import java.util.Properties /** * Represents the Postgresql database. - * On setup cleans the entire database and creates empty tables. + * On setup cleans the entire database. * - * @author Mateusz + * @author Mateusz Kwiatkowski * - * @param address ipv4 address - * @param port postgres post - * @param dbName database name - * @param user - * @param password + * @see <a href=https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html> + * https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html</a> */ -public class PostgresqlDB( - address : String, - port : Int, - dbName : String, - private var user : String, - private var password : String, -) { +@Suppress("DEPRECATION") +public class PostgresqlDB { + private var properties = Properties() private var connection : Connection? = null - private var dbUrl : String = "" init { - dbUrl = "jdbc:postgresql://$address:$port/$dbName" - println(dbUrl) try { - connection = DriverManager.getConnection(dbUrl, user, password) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(PostgresqlDB::class.java.getResource("/database.toml")) + connection = DriverManager.getConnection( + properties.getProperty("address").asJdbc(properties.getProperty("table")), + properties.getProperty("user"), + properties.getProperty("password")) clear() - setup() } catch (e: SQLException) { print("${e.message}") } } - public fun setup(){ - val CREATE_TABLE = """ - CREATE TABLE metrics ( - id SERIAL PRIMARY KEY, - timestamp bigint, - tasksActive integer, - clusterName varchar(10)); - """.trimIndent() - - try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(CREATE_TABLE) - } catch (e: SQLException){ - println("${e.message}") - } - } - public fun clear(){ val DELETE_ALL_TABLES = """ DROP SCHEMA public CASCADE; CREATE SCHEMA public; """.trimIndent() try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(DELETE_ALL_TABLES) + val st = connection?.createStatement() + st?.executeQuery(DELETE_ALL_TABLES) } catch (e: SQLException){ println("${e.message}") } } + + private fun String.asJdbc(table : String) : String { + return "jdbc:postgresql://$this/$table" + } + }
\ No newline at end of file diff --git a/opendc-common/src/main/resources/consumer.toml b/opendc-common/src/main/resources/consumer.toml new file mode 100644 index 00000000..0f4a2d1d --- /dev/null +++ b/opendc-common/src/main/resources/consumer.toml @@ -0,0 +1,6 @@ +"bootstrap.servers" = "127.0.0.1:9092" +"key.deserializer" = "org.apache.kafka.common.serialization.VoidDeserializer" +"value.deserializer" = "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer" +"group.id" = "connect-postgresql-sink" +"enable.auto.commit" = "true" +"auto.commit.interval.ms" = "1000" diff --git a/opendc-common/src/main/resources/database.toml b/opendc-common/src/main/resources/database.toml new file mode 100644 index 00000000..35e1d159 --- /dev/null +++ b/opendc-common/src/main/resources/database.toml @@ -0,0 +1,5 @@ +"address" = "127.0.0.1:5432" +"username" = "matt" +"password" = "admin" +"database" = "opendc" +"table" = "postgres_topic"
\ No newline at end of file diff --git a/opendc-common/src/main/resources/log4j2.xml b/opendc-common/src/main/resources/log4j2.xml index 07389360..d79ec204 100644 --- a/opendc-common/src/main/resources/log4j2.xml +++ b/opendc-common/src/main/resources/log4j2.xml @@ -29,6 +29,7 @@ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> </Console> </Appenders> + <Loggers> <Logger name="org.opendc" level="warn" additivity="false"> <AppenderRef ref="Console"/> diff --git a/opendc-common/src/main/resources/producer.toml b/opendc-common/src/main/resources/producer.toml new file mode 100644 index 00000000..33a09284 --- /dev/null +++ b/opendc-common/src/main/resources/producer.toml @@ -0,0 +1,5 @@ +"bootstrap.servers" = "127.0.0.1:9092" +"key.serializer" = "org.apache.kafka.common.serialization.VoidSerializer" +"value.serializer" = "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer" +"schema.registry.url" = "http://localhost:8081" +"auto.register.schemas" = "true"
\ No newline at end of file diff --git a/opendc-common/src/main/resources/schema.proto b/opendc-common/src/main/resources/schema.proto new file mode 100644 index 00000000..d0aa18d5 --- /dev/null +++ b/opendc-common/src/main/resources/schema.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package proto; + +option java_package = "org.opendc.common"; +option java_outer_classname = "ProtobufMetrics"; + +message ProtoExport { + string timestamp = 1; + string host_id = 2; + int32 tasksactive = 3; + double cpuutilization = 4; + double energyusage = 5; +}
\ No newline at end of file diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt index bf8b7825..a4c4209c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt @@ -48,13 +48,14 @@ public class SmartScheduler() : ComputeScheduler { // You need to specify how much time do you have to make the prediction between receiving a time and putting onto a host return SchedulingResult(SchedulingResultType.EMPTY) } + // Benefits of a digital twin: during operations you make sure what is happening in the real world. // The use-case is making split-second automated decisions before operators can make them. // Make a strong case for making a Digital Twin. override fun removeTask( task: ServiceTask, - host: HostView? + host: HostView?, ) { TODO("Not yet implemented") } -}
\ No newline at end of file +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt index d0be10db..5dfa21c5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt @@ -1,50 +1,57 @@ -package org.opendc.compute.simulator.telemetry +/* + * Copyright (c) 2026 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package org.opendc.compute.simulator.telemetry -import com.fasterxml.jackson.databind.ObjectMapper -import org.opendc.common.utils.Config -import org.opendc.common.utils.Kafka import org.opendc.common.ProtobufMetrics - +import org.opendc.common.utils.Kafka import org.opendc.compute.simulator.telemetry.table.host.HostTableReader /** - * @author Mateusz * This class logs data from the simulator into Kafka. + * The data uses the Protobuf format. + * + * @author Mateusz Kwiatkowski + * + * @see <a href=https://protobuf.dev/getting-started/javatutorial/> + * https://protobuf.dev/getting-started/javatutorial/</a> */ -public class KafkaComputeMonitor: ComputeMonitor { - private val metrics : MonitoringMetrics = MonitoringMetrics() - private val kafka : Kafka? = Config.getKafkaInstance() +public class KafkaComputeMonitor : ComputeMonitor { + private val send = Kafka("postgres_topic").getSend() @Override override fun record(reader: HostTableReader) { - metrics.id += 1 - metrics.timestamp = reader.timestamp.toEpochMilli() - metrics.tasksActive = reader.tasksActive - metrics.clusterName = reader.hostInfo.clusterName - - try{ - val packet = ProtobufMetrics.ProtoExport.newBuilder() - .setId(metrics.id) - .setTasksactive(metrics.tasksActive) - .build() - kafka?.send(packet) - } - - catch(e: Exception){ + try { + val packet = + ProtobufMetrics.ProtoExport.newBuilder() + .setTimestamp(reader.timestamp.toEpochMilli().toString()) + .setHostId(reader.hostInfo.name) + .setTasksactive(reader.tasksActive) + .setCpuutilization(reader.cpuUtilization) + .setEnergyusage(reader.energyUsage) + .build() + this.send(packet) + } catch (e: Exception) { println("${e.message}") } } - -} - -/** - * @author Mateusz - * This serves as editable data class for ObjectMapper(). - */ -public class MonitoringMetrics { - public var id: Int = 0 - public var timestamp: Long = 0 - public var tasksActive : Int = 0 - public var clusterName: String = "" } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt index 96071833..78ce6158 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentCli.kt @@ -30,7 +30,6 @@ import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file import org.opendc.common.utils.Config import org.opendc.common.utils.ConfigReader -import org.opendc.common.utils.Kafka import org.opendc.common.utils.PostgresqlDB import org.opendc.experiments.base.experiment.getExperiment import java.io.File @@ -47,11 +46,9 @@ public fun main(args: Array<String>) { else ExperimentListener().main(args) } - - /** - * @author Mateusz * Opens a client socket from `config`, but otherwise works as before. + * @author Mateusz */ internal class ExperimentCommand : CliktCommand(name = "experiment") { private val configPath by option("--config-path", help = "path to config file") @@ -68,9 +65,7 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { var clientSocket : Socket? = null try { - clientSocket = Socket(config.address, config.port) Config.setConfigSocket(clientSocket) - Config.setKafkaInstance(Kafka(config.topic, config.address, config.kafka)) val experiment = getExperiment(experimentPath) runExperiment(experiment) @@ -84,8 +79,8 @@ internal class ExperimentCommand : CliktCommand(name = "experiment") { } /** - * @author Mateusz * Creates a server socket and database connection from `config`. + * @author Mateusz */ internal class ExperimentListener: CliktCommand(name = "listener") { private val configPath by option("--config-path", help = "path to config file") @@ -96,9 +91,8 @@ internal class ExperimentListener: CliktCommand(name = "listener") { val configReader = ConfigReader() var serverSocket: ServerSocket? = null val config = configReader.read(configPath) - Config.setDB(PostgresqlDB(config.address, config.postgresql, config.database, config.username, config.password)) - try { + val inetAddress = InetAddress.getByName(config.address) serverSocket = ServerSocket(config.port, config.backlog, inetAddress) runListener(serverSocket) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt index 9fee6cf9..3867a9f0 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ExperimentRunner.kt @@ -25,11 +25,11 @@ package org.opendc.experiments.base.runner import me.tongfei.progressbar.ProgressBarBuilder import me.tongfei.progressbar.ProgressBarStyle import org.opendc.common.utils.Config +import org.opendc.common.utils.PostgresqlDB import org.opendc.experiments.base.experiment.Scenario import java.io.IOException import java.net.ServerSocket import java.net.Socket -import org.opendc.demo.runRequest /** * Run scenario when no pool is available for parallel execution @@ -71,16 +71,19 @@ public fun runExperiment(experiment: List<Scenario>) { */ public fun runListener(socket: ServerSocket) { var client : Socket? = null + val db = PostgresqlDB() try { client = socket.accept() Config.setConfigSocket(client) - val request = ByteArray(1024) + // val request = ByteArray(1024) while(true){ +/* val ret : Int? = Config.getConfigReader()?.read(request) if(ret == -1) break if(ret != null && ret > 0) runRequest(String(request, 0, ret)) - } + + */ } } catch (e: IOException) { println("${e.message}") diff --git a/output/greenifier-demo-scaling/trackr.json b/output/greenifier-demo-scaling/trackr.json index f5113dfd..a595beff 100644 --- a/output/greenifier-demo-scaling/trackr.json +++ b/output/greenifier-demo-scaling/trackr.json @@ -17,44 +17,4 @@ "service": true } } -}, -{ - "name": "greenifier-demo-scaling", - "topology": { - "pathToFile": "resources/topologies/surf_medium.json" - }, - "workload": { - "pathToFile": "resources/workloads/surf_month", - "type": "ComputeWorkload" - }, - "exportModel": { - "exportInterval": 3600, - "filesToExportDict": { - "host": true, - "task": true, - "powerSource": true, - "battery": true, - "service": true - } - } -}, -{ - "name": "greenifier-demo-scaling", - "topology": { - "pathToFile": "resources/topologies/surf_small.json" - }, - "workload": { - "pathToFile": "resources/workloads/surf_month", - "type": "ComputeWorkload" - }, - "exportModel": { - "exportInterval": 3600, - "filesToExportDict": { - "host": true, - "task": true, - "powerSource": true, - "battery": true, - "service": true - } - } }]
\ No newline at end of file diff --git a/resources/experiments/config.json b/resources/experiments/config.json index 9527923d..ae383f45 100644 --- a/resources/experiments/config.json +++ b/resources/experiments/config.json @@ -7,6 +7,7 @@ "username" : "matt", "password" : "admin", "database" : "opendc", - "topic" : "postgres-topic", + "topic" : "postgres_topic", "kafka" : "9092" } + diff --git a/resources/experiments/config.toml b/resources/experiments/config.toml new file mode 100644 index 00000000..b9ff2b38 --- /dev/null +++ b/resources/experiments/config.toml @@ -0,0 +1,11 @@ +// use a log4j to log your results in a form of a log + + +// Use check instead of require +// use type-aliases +// use higher-order +// annotation classes +// operator objects - Factory Design Pattern +// if it can be modeled as an instance then you do not need functions +// make your kotlin program write to standalone +// latex files that create figures themselves instead of using python diff --git a/resources/experiments/experiment_scaling.json b/resources/experiments/experiment_scaling.json index 1051389d..ce0b917c 100644 --- a/resources/experiments/experiment_scaling.json +++ b/resources/experiments/experiment_scaling.json @@ -3,13 +3,8 @@ "topologies": [ { "pathToFile": "resources/topologies/surf.json" - }, - { - "pathToFile": "resources/topologies/surf_medium.json" - }, - { - "pathToFile": "resources/topologies/surf_small.json" - }], + } + ], "workloads": [ { "pathToFile": "resources/workloads/surf_month", diff --git a/resources/experiments/schema.proto b/resources/experiments/schema.proto deleted file mode 100644 index 2a308edd..00000000 --- a/resources/experiments/schema.proto +++ /dev/null @@ -1,11 +0,0 @@ -syntax = "proto2"; - -package proto; - -option java_package = "org.opendc.common"; -option java_outer_classname = "ProtobufMetrics"; - -message ProtoExport { - required int32 id = 1; - required int32 tasksactive = 2; -}
\ No newline at end of file diff --git a/resources/experiments/sink-jdbc.properties b/resources/experiments/sink-jdbc.properties index 4a78b2ed..a36dad31 100644 --- a/resources/experiments/sink-jdbc.properties +++ b/resources/experiments/sink-jdbc.properties @@ -25,7 +25,7 @@ value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=http://localhost:8081 # The topics to consume from - required for sink connectors like this one -topics=postgres-topic +topics=postgres_topic # Configuration specific to the JDBC sink connector. # We want to connect to a Postgres database stored in the file test.db and auto-create tables. @@ -34,6 +34,7 @@ connection.url=jdbc:postgresql://127.0.0.1:5432/opendc connection.user=matt connection.password=admin auto.create=true +auto.evolve=true # Define when identifiers should be quoted in DDL and DML statements. # The default is 'always' to maintain backward compatibility with prior versions. @@ -43,4 +44,4 @@ auto.create=true # Here are some values that enable JSON formatted files to be ingested by Postgresql insert.mode=insert - +table.name.format=postgres_topic
\ No newline at end of file diff --git a/resources/topologies/surf_small_solution.json b/resources/topologies/surf_small_solution.json deleted file mode 100644 index 8c3d8ce3..00000000 --- a/resources/topologies/surf_small_solution.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "clusters": - [ - { - "name": "C01", - "hosts" : - [ - { - "name": "H01", - "cpu": { - "coreCount": 16, - "coreSpeed": 2100 - }, - "memory": { - "memorySize": 128000000 - }, - "count": 100 - } - ], - "powerSource": { - "carbonTracePath": "resources/carbon_traces/NL_2021-2024.parquet" - } - } - ] -}
\ No newline at end of file |
