diff options
Diffstat (limited to 'opendc-common')
5 files changed, 870 insertions, 0 deletions
diff --git a/opendc-common/build.gradle.kts b/opendc-common/build.gradle.kts index aeb9bc4d..4d9e8b54 100644 --- a/opendc-common/build.gradle.kts +++ b/opendc-common/build.gradle.kts @@ -1,3 +1,4 @@ + /* * Copyright (c) 2020 AtLarge Research * @@ -29,9 +30,18 @@ plugins { kotlin("plugin.serialization") version "1.9.22" } +repositories { + maven(url = "https://packages.confluent.io/maven/") +} + + val serializationVersion = "1.6.0" dependencies { + + //@Mateusz: for the postgresql database + implementation("org.postgresql:postgresql:42.7.10") + api(libs.kotlinx.coroutines) implementation(libs.kotlin.logging) implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion") @@ -41,4 +51,17 @@ dependencies { api(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) + + // Source: https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients + implementation("org.apache.kafka:kafka-clients:4.1.1") + implementation(libs.jackson.core) + + // Source: https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java + // @Mateusz crucial this is an _api_ and not _implementation_ + api("com.google.protobuf:protobuf-java:4.33.5") + + // Source: https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer + implementation("io.confluent:kafka-protobuf-serializer:8.1.1") } + + diff --git a/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java new file mode 100644 index 00000000..0ec97cd0 --- /dev/null +++ b/opendc-common/src/main/java/org/opendc/common/ProtobufMetrics.java @@ -0,0 +1,638 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// NO CHECKED-IN PROTOBUF GENCODE +// source: schema.proto +// Protobuf Java Version: 4.33.1 + +package org.opendc.common; + +@com.google.protobuf.Generated +public final class ProtobufMetrics extends com.google.protobuf.GeneratedFile { + private ProtobufMetrics() {} + static { + com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( + com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, + /* major= */ 4, + /* minor= */ 33, + /* patch= */ 1, + /* suffix= */ "", + "ProtobufMetrics"); + } + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + public interface ProtoExportOrBuilder extends + // @@protoc_insertion_point(interface_extends:proto.ProtoExport) + com.google.protobuf.MessageOrBuilder { + + /** + * <code>required int32 id = 1;</code> + * @return Whether the id field is set. + */ + boolean hasId(); + /** + * <code>required int32 id = 1;</code> + * @return The id. + */ + int getId(); + + /** + * <code>required int32 tasksactive = 2;</code> + * @return Whether the tasksactive field is set. + */ + boolean hasTasksactive(); + /** + * <code>required int32 tasksactive = 2;</code> + * @return The tasksactive. + */ + int getTasksactive(); + } + /** + * Protobuf type {@code proto.ProtoExport} + */ + public static final class ProtoExport extends + com.google.protobuf.GeneratedMessage implements + // @@protoc_insertion_point(message_implements:proto.ProtoExport) + ProtoExportOrBuilder { + private static final long serialVersionUID = 0L; + static { + com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( + com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, + /* major= */ 4, + /* minor= */ 33, + /* patch= */ 1, + /* suffix= */ "", + "ProtoExport"); + } + // Use ProtoExport.newBuilder() to construct. + private ProtoExport(com.google.protobuf.GeneratedMessage.Builder<?> builder) { + super(builder); + } + private ProtoExport() { + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendc.common.ProtobufMetrics.internal_static_proto_ProtoExport_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendc.common.ProtobufMetrics.internal_static_proto_ProtoExport_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendc.common.ProtobufMetrics.ProtoExport.class, org.opendc.common.ProtobufMetrics.ProtoExport.Builder.class); + } + + private int bitField0_; + public static final int ID_FIELD_NUMBER = 1; + private int id_ = 0; + /** + * <code>required int32 id = 1;</code> + * @return Whether the id field is set. + */ + @java.lang.Override + public boolean hasId() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * <code>required int32 id = 1;</code> + * @return The id. + */ + @java.lang.Override + public int getId() { + return id_; + } + + public static final int TASKSACTIVE_FIELD_NUMBER = 2; + private int tasksactive_ = 0; + /** + * <code>required int32 tasksactive = 2;</code> + * @return Whether the tasksactive field is set. + */ + @java.lang.Override + public boolean hasTasksactive() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * <code>required int32 tasksactive = 2;</code> + * @return The tasksactive. + */ + @java.lang.Override + public int getTasksactive() { + return tasksactive_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + if (!hasId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasTasksactive()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) != 0)) { + output.writeInt32(1, id_); + } + if (((bitField0_ & 0x00000002) != 0)) { + output.writeInt32(2, tasksactive_); + } + getUnknownFields().writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) != 0)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, id_); + } + if (((bitField0_ & 0x00000002) != 0)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(2, tasksactive_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.opendc.common.ProtobufMetrics.ProtoExport)) { + return super.equals(obj); + } + org.opendc.common.ProtobufMetrics.ProtoExport other = (org.opendc.common.ProtobufMetrics.ProtoExport) obj; + + if (hasId() != other.hasId()) return false; + if (hasId()) { + if (getId() + != other.getId()) return false; + } + if (hasTasksactive() != other.hasTasksactive()) return false; + if (hasTasksactive()) { + if (getTasksactive() + != other.getTasksactive()) return false; + } + if (!getUnknownFields().equals(other.getUnknownFields())) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (hasId()) { + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + getId(); + } + if (hasTasksactive()) { + hash = (37 * hash) + TASKSACTIVE_FIELD_NUMBER; + hash = (53 * hash) + getTasksactive(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input, extensionRegistry); + } + + public static org.opendc.common.ProtobufMetrics.ProtoExport parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseDelimitedWithIOException(PARSER, input); + } + + public static org.opendc.common.ProtobufMetrics.ProtoExport parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input); + } + public static org.opendc.common.ProtobufMetrics.ProtoExport parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessage + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(org.opendc.common.ProtobufMetrics.ProtoExport prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code proto.ProtoExport} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder<Builder> implements + // @@protoc_insertion_point(builder_implements:proto.ProtoExport) + org.opendc.common.ProtobufMetrics.ProtoExportOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendc.common.ProtobufMetrics.internal_static_proto_ProtoExport_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendc.common.ProtobufMetrics.internal_static_proto_ProtoExport_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendc.common.ProtobufMetrics.ProtoExport.class, org.opendc.common.ProtobufMetrics.ProtoExport.Builder.class); + } + + // Construct using org.opendc.common.ProtobufMetrics.ProtoExport.newBuilder() + private Builder() { + + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + + } + @java.lang.Override + public Builder clear() { + super.clear(); + bitField0_ = 0; + id_ = 0; + tasksactive_ = 0; + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.opendc.common.ProtobufMetrics.internal_static_proto_ProtoExport_descriptor; + } + + @java.lang.Override + public org.opendc.common.ProtobufMetrics.ProtoExport getDefaultInstanceForType() { + return org.opendc.common.ProtobufMetrics.ProtoExport.getDefaultInstance(); + } + + @java.lang.Override + public org.opendc.common.ProtobufMetrics.ProtoExport build() { + org.opendc.common.ProtobufMetrics.ProtoExport result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public org.opendc.common.ProtobufMetrics.ProtoExport buildPartial() { + org.opendc.common.ProtobufMetrics.ProtoExport result = new org.opendc.common.ProtobufMetrics.ProtoExport(this); + if (bitField0_ != 0) { buildPartial0(result); } + onBuilt(); + return result; + } + + private void buildPartial0(org.opendc.common.ProtobufMetrics.ProtoExport result) { + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) != 0)) { + result.id_ = id_; + to_bitField0_ |= 0x00000001; + } + if (((from_bitField0_ & 0x00000002) != 0)) { + result.tasksactive_ = tasksactive_; + to_bitField0_ |= 0x00000002; + } + result.bitField0_ |= to_bitField0_; + } + + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.opendc.common.ProtobufMetrics.ProtoExport) { + return mergeFrom((org.opendc.common.ProtobufMetrics.ProtoExport)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.opendc.common.ProtobufMetrics.ProtoExport other) { + if (other == org.opendc.common.ProtobufMetrics.ProtoExport.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasTasksactive()) { + setTasksactive(other.getTasksactive()); + } + this.mergeUnknownFields(other.getUnknownFields()); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + if (!hasId()) { + return false; + } + if (!hasTasksactive()) { + return false; + } + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + id_ = input.readInt32(); + bitField0_ |= 0x00000001; + break; + } // case 8 + case 16: { + tasksactive_ = input.readInt32(); + bitField0_ |= 0x00000002; + break; + } // case 16 + default: { + if (!super.parseUnknownField(input, extensionRegistry, tag)) { + done = true; // was an endgroup tag + } + break; + } // default: + } // switch (tag) + } // while (!done) + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } finally { + onChanged(); + } // finally + return this; + } + private int bitField0_; + + private int id_ ; + /** + * <code>required int32 id = 1;</code> + * @return Whether the id field is set. + */ + @java.lang.Override + public boolean hasId() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * <code>required int32 id = 1;</code> + * @return The id. + */ + @java.lang.Override + public int getId() { + return id_; + } + /** + * <code>required int32 id = 1;</code> + * @param value The id to set. + * @return This builder for chaining. + */ + public Builder setId(int value) { + + id_ = value; + bitField0_ |= 0x00000001; + onChanged(); + return this; + } + /** + * <code>required int32 id = 1;</code> + * @return This builder for chaining. + */ + public Builder clearId() { + bitField0_ = (bitField0_ & ~0x00000001); + id_ = 0; + onChanged(); + return this; + } + + private int tasksactive_ ; + /** + * <code>required int32 tasksactive = 2;</code> + * @return Whether the tasksactive field is set. + */ + @java.lang.Override + public boolean hasTasksactive() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * <code>required int32 tasksactive = 2;</code> + * @return The tasksactive. + */ + @java.lang.Override + public int getTasksactive() { + return tasksactive_; + } + /** + * <code>required int32 tasksactive = 2;</code> + * @param value The tasksactive to set. + * @return This builder for chaining. + */ + public Builder setTasksactive(int value) { + + tasksactive_ = value; + bitField0_ |= 0x00000002; + onChanged(); + return this; + } + /** + * <code>required int32 tasksactive = 2;</code> + * @return This builder for chaining. + */ + public Builder clearTasksactive() { + bitField0_ = (bitField0_ & ~0x00000002); + tasksactive_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:proto.ProtoExport) + } + + // @@protoc_insertion_point(class_scope:proto.ProtoExport) + private static final org.opendc.common.ProtobufMetrics.ProtoExport DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new org.opendc.common.ProtobufMetrics.ProtoExport(); + } + + public static org.opendc.common.ProtobufMetrics.ProtoExport getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser<ProtoExport> + PARSER = new com.google.protobuf.AbstractParser<ProtoExport>() { + @java.lang.Override + public ProtoExport parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + Builder builder = newBuilder(); + try { + builder.mergeFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(builder.buildPartial()); + } catch (com.google.protobuf.UninitializedMessageException e) { + throw e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial()); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException(e) + .setUnfinishedMessage(builder.buildPartial()); + } + return builder.buildPartial(); + } + }; + + public static com.google.protobuf.Parser<ProtoExport> parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser<ProtoExport> getParserForType() { + return PARSER; + } + + @java.lang.Override + public org.opendc.common.ProtobufMetrics.ProtoExport getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_proto_ProtoExport_descriptor; + private static final + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_proto_ProtoExport_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\014schema.proto\022\005proto\".\n\013ProtoExport\022\n\n\002" + + "id\030\001 \002(\005\022\023\n\013tasksactive\030\002 \002(\005B$\n\021org.ope" + + "ndc.commonB\017ProtobufMetrics" + }; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }); + internal_static_proto_ProtoExport_descriptor = + getDescriptor().getMessageType(0); + internal_static_proto_ProtoExport_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_proto_ProtoExport_descriptor, + new java.lang.String[] { "Id", "Tasksactive", }); + descriptor.resolveAllFeaturesImmutable(); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt new file mode 100644 index 00000000..e6f18da5 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt @@ -0,0 +1,97 @@ +package org.opendc.common.utils + +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.decodeFromStream + +import java.io.File +import java.io.IOException +import java.io.InputStream +import java.io.OutputStream +import java.net.Socket +import java.sql.Connection + +/** + * @author Mateusz + * @property name + * @property backlog the amount of connections to accept + * @property address IPv4 address + * @property port + * @property postgresql Postgresql port + * @property username Postgresql user + * @property password Postgresql password + * @property database Postgresql database + * @property topic Kafka topic + * @property kafka Kafka port + */ +@Serializable +public data class Config( + val name: String = "", + var backlog: Int = 0, + val address: String = "", + val port: Int = 0, + val postgresql: Int = 0, + val username : String = "", + val password : String = "", + val database: String = "", + val topic : String = "", + val kafka: Int = 0, +){ + + public companion object{ + public var input: InputStream? = null + public var output: OutputStream? = null + public var connection : Connection? = null + public var kafka : Kafka? = null + public var database : PostgresqlDB? = null + + public var socket: Socket? = null + + public fun setConfigSocket(socket: Socket?){ + this.socket = socket + try { + input = socket?.getInputStream() + output = socket?.getOutputStream() + } catch (e: IOException){ + print("${e.message}") + } + } + + public fun getConfigReader() : InputStream? { + return input + } + + public fun getConfigWriter() : OutputStream? { + return output + } + + public fun setKafkaInstance(kafka : Kafka) { + this.kafka = kafka + } + + public fun getKafkaInstance() : Kafka? { + return this.kafka + } + + public fun setDB(db : PostgresqlDB){ + this.database = db + } + + public fun getDB() : PostgresqlDB?{ + return this.database + } + } +} +/** + * @author Mateusz + * Reads `config.json` into Config data class. + */ +public class ConfigReader { + private val jsonReader = Json + public fun read(file: File): Config = read(file.inputStream()) + @OptIn(ExperimentalSerializationApi::class) + public fun read(input: InputStream): Config { + return jsonReader.decodeFromStream<Config>(input) + } +} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt new file mode 100644 index 00000000..48590d9f --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -0,0 +1,41 @@ +package org.opendc.common.utils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.opendc.common.ProtobufMetrics + +import java.util.* + +public class Kafka ( + private val topic : String, + address : String, + port : Int, +) { + private val servers : String = "$address:$port" + private var properties: Properties? = null + private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null + + init { + properties = Properties() + properties?.put("bootstrap.servers", servers) + properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") + properties?.put("schema.registry.url", "http://localhost:8081") + properties?.put("auto.register.schemas", "true") + + try { + producer = KafkaProducer(properties) + } catch (e: Exception){ + println("${e.message}") + } + } + + public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? { + return this.producer + } + + public fun send(value : ProtobufMetrics.ProtoExport){ + producer?.send(ProducerRecord(this.topic, value)) + } + +}
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt new file mode 100644 index 00000000..69314ef3 --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -0,0 +1,71 @@ +package org.opendc.common.utils + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.SQLException + +/** + * Represents the Postgresql database. + * On setup cleans the entire database and creates empty tables. + * + * @author Mateusz + * + * @param address ipv4 address + * @param port postgres post + * @param dbName database name + * @param user + * @param password + */ +public class PostgresqlDB( + address : String, + port : Int, + dbName : String, + private var user : String, + private var password : String, +) { + private var connection : Connection? = null + private var dbUrl : String = "" + + init { + dbUrl = "jdbc:postgresql://$address:$port/$dbName" + println(dbUrl) + try { + connection = DriverManager.getConnection(dbUrl, user, password) + clear() + setup() + } catch (e: SQLException) { + print("${e.message}") + } + } + public fun setup(){ + val CREATE_TABLE = """ + CREATE TABLE metrics ( + id SERIAL PRIMARY KEY, + timestamp bigint, + tasksActive integer, + clusterName varchar(10)); + """.trimIndent() + + try { + val conn = DriverManager.getConnection(dbUrl, user, password) + val st = conn.createStatement() + st.executeQuery(CREATE_TABLE) + } catch (e: SQLException){ + println("${e.message}") + } + } + + public fun clear(){ + val DELETE_ALL_TABLES = """ + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + """.trimIndent() + try { + val conn = DriverManager.getConnection(dbUrl, user, password) + val st = conn.createStatement() + st.executeQuery(DELETE_ALL_TABLES) + } catch (e: SQLException){ + println("${e.message}") + } + } +}
\ No newline at end of file |
