From 2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Mon, 16 Feb 2026 15:18:21 +0100 Subject: feat: opendc -> kafka -> postgresql works; added protobuf encoding --- .../kotlin/org/opendc/common/utils/ConfigParser.kt | 37 +++++++++++++++---- .../main/kotlin/org/opendc/common/utils/Kafka.kt | 41 ++++++++++++++++++++++ .../kotlin/org/opendc/common/utils/PostgresqlDB.kt | 39 +++++++++++++------- 3 files changed, 99 insertions(+), 18 deletions(-) create mode 100644 opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils') diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt index cb9623bb..e6f18da5 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt @@ -11,32 +11,40 @@ import java.io.InputStream import java.io.OutputStream import java.net.Socket import java.sql.Connection -import java.sql.DriverManager -import java.sql.SQLException /** + * @author Mateusz * @property name * @property backlog the amount of connections to accept - * @property databasePath * @property address IPv4 address * @property port + * @property postgresql Postgresql port + * @property username Postgresql user + * @property password Postgresql password + * @property database Postgresql database + * @property topic Kafka topic + * @property kafka Kafka port */ @Serializable public data class Config( val name: String = "", var backlog: Int = 0, val address: String = "", - val port: Int = 8080, - val postgresql: Int = 5342, + val port: Int = 0, + val postgresql: Int = 0, val username : String = "", val password : String = "", - val database: String = "" + val database: String = "", + val topic : String = "", + val kafka: Int = 0, ){ public companion object{ public var input: InputStream? = null public var output: OutputStream? = null public var connection : Connection? = null + public var kafka : Kafka? = null + public var database : PostgresqlDB? = null public var socket: Socket? = null @@ -57,9 +65,26 @@ public data class Config( public fun getConfigWriter() : OutputStream? { return output } + + public fun setKafkaInstance(kafka : Kafka) { + this.kafka = kafka + } + + public fun getKafkaInstance() : Kafka? { + return this.kafka + } + + public fun setDB(db : PostgresqlDB){ + this.database = db + } + + public fun getDB() : PostgresqlDB?{ + return this.database + } } } /** + * @author Mateusz * Reads `config.json` into Config data class. */ public class ConfigReader { diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt new file mode 100644 index 00000000..48590d9f --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -0,0 +1,41 @@ +package org.opendc.common.utils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.opendc.common.ProtobufMetrics + +import java.util.* + +public class Kafka ( + private val topic : String, + address : String, + port : Int, +) { + private val servers : String = "$address:$port" + private var properties: Properties? = null + private var producer: Producer? = null + + init { + properties = Properties() + properties?.put("bootstrap.servers", servers) + properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") + properties?.put("schema.registry.url", "http://localhost:8081") + properties?.put("auto.register.schemas", "true") + + try { + producer = KafkaProducer(properties) + } catch (e: Exception){ + println("${e.message}") + } + } + + public fun getProducer() : Producer? { + return this.producer + } + + public fun send(value : ProtobufMetrics.ProtoExport){ + producer?.send(ProducerRecord(this.topic, value)) + } + +} \ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt index 3dc7a0e4..69314ef3 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -4,25 +4,40 @@ import java.sql.Connection import java.sql.DriverManager import java.sql.SQLException -public class PostgresqlDB { - public var connection : Connection? = null - public var dbUrl : String = "" - public var user : String = "" - public var password : String = "" +/** + * Represents the Postgresql database. + * On setup cleans the entire database and creates empty tables. + * + * @author Mateusz + * + * @param address ipv4 address + * @param port postgres post + * @param dbName database name + * @param user + * @param password + */ +public class PostgresqlDB( + address : String, + port : Int, + dbName : String, + private var user : String, + private var password : String, +) { + private var connection : Connection? = null + private var dbUrl : String = "" - public fun setupDatabase(address : String, port : Int, dbUser : String, dbPassword : String, db : String){ - dbUrl = "jdbc:postgresql://$address:$port/$db" - user = dbUser - password = dbPassword + init { + dbUrl = "jdbc:postgresql://$address:$port/$dbName" println(dbUrl) try { - connection = DriverManager.getConnection(dbUrl, dbUser, dbPassword) + connection = DriverManager.getConnection(dbUrl, user, password) + clear() + setup() } catch (e: SQLException) { print("${e.message}") } } - - public fun create(){ + public fun setup(){ val CREATE_TABLE = """ CREATE TABLE metrics ( id SERIAL PRIMARY KEY, -- cgit v1.2.3