From f5da60e4275ca1172128c3994298691e12d5e1f8 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Fri, 20 Feb 2026 16:17:39 +0100 Subject: fix: changed the syntex to slowly get rid of the Config class --- .../kotlin/org/opendc/common/utils/ConfigParser.kt | 13 ++++- .../main/kotlin/org/opendc/common/utils/Kafka.kt | 67 +++++++++++++--------- .../kotlin/org/opendc/common/utils/PostgresqlDB.kt | 63 +++++++------------- 3 files changed, 74 insertions(+), 69 deletions(-) (limited to 'opendc-common/src/main/kotlin/org/opendc/common') diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt index e6f18da5..8261f6f0 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt @@ -13,7 +13,6 @@ import java.net.Socket import java.sql.Connection /** - * @author Mateusz * @property name * @property backlog the amount of connections to accept * @property address IPv4 address @@ -22,8 +21,14 @@ import java.sql.Connection * @property username Postgresql user * @property password Postgresql password * @property database Postgresql database - * @property topic Kafka topic + * @property topic Kafka topic and database table name * @property kafka Kafka port + * @author Mateusz + */ +/* + + Use `by lazy` here. + Use design patterns - singleton. */ @Serializable public data class Config( @@ -50,6 +55,10 @@ public data class Config( public fun setConfigSocket(socket: Socket?){ this.socket = socket + // no try catch if the exception is not from Java + // do not use raw sockets, use a service for the communication + // use redis instead of HTTP GET (consider it, but not bound in stone) + // make an API KTor try { input = socket?.getInputStream() output = socket?.getOutputStream() diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt index 48590d9f..d7ccd385 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -1,41 +1,56 @@ package org.opendc.common.utils + +import com.fasterxml.jackson.dataformat.toml.TomlMapper +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics - import java.util.* +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.toJavaDuration +/** + * Represents the Kafka interface. + * @constructor `topic` the Kafka topic + * @author Mateusz Kwiatkowski + * @see + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html + * @see + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html + */ -public class Kafka ( - private val topic : String, - address : String, - port : Int, -) { - private val servers : String = "$address:$port" - private var properties: Properties? = null - private var producer: Producer? = null +@Suppress("DEPRECATION") +public class Kafka(private val topic: String) { + private var properties : Properties init { - properties = Properties() - properties?.put("bootstrap.servers", servers) - properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") - properties?.put("schema.registry.url", "http://localhost:8081") - properties?.put("auto.register.schemas", "true") + check(!topic.contains("-")) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(Kafka::class.java.getResource("/producer.toml")) + } - try { - producer = KafkaProducer(properties) - } catch (e: Exception){ + public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit { + val producer = KafkaProducer(properties) + return fun (value : ProtobufMetrics.ProtoExport) { + try { + producer.send(ProducerRecord(this.topic, value)) + } catch (e: Exception) { println("${e.message}") + } } } - public fun getProducer() : Producer? { - return this.producer - } - - public fun send(value : ProtobufMetrics.ProtoExport){ - producer?.send(ProducerRecord(this.topic, value)) + // TODO: fix + public fun getReceive() : () -> Unit { + val consumer = KafkaConsumer(properties) + return fun() : Unit { + try { + consumer.subscribe(listOf(topic)) + while (true) { + consumer.poll(1.microseconds.toJavaDuration()) + } + } catch (e: Exception) { + println("${e.message}") + } + } } - } \ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt index 69314ef3..361925ee 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -1,71 +1,52 @@ package org.opendc.common.utils +import com.fasterxml.jackson.dataformat.toml.TomlMapper import java.sql.Connection import java.sql.DriverManager import java.sql.SQLException - +import java.util.Properties /** * Represents the Postgresql database. - * On setup cleans the entire database and creates empty tables. + * On setup cleans the entire database. * - * @author Mateusz + * @author Mateusz Kwiatkowski * - * @param address ipv4 address - * @param port postgres post - * @param dbName database name - * @param user - * @param password + * @see + * https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html */ -public class PostgresqlDB( - address : String, - port : Int, - dbName : String, - private var user : String, - private var password : String, -) { +@Suppress("DEPRECATION") +public class PostgresqlDB { + private var properties = Properties() private var connection : Connection? = null - private var dbUrl : String = "" init { - dbUrl = "jdbc:postgresql://$address:$port/$dbName" - println(dbUrl) try { - connection = DriverManager.getConnection(dbUrl, user, password) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(PostgresqlDB::class.java.getResource("/database.toml")) + connection = DriverManager.getConnection( + properties.getProperty("address").asJdbc(properties.getProperty("table")), + properties.getProperty("user"), + properties.getProperty("password")) clear() - setup() } catch (e: SQLException) { print("${e.message}") } } - public fun setup(){ - val CREATE_TABLE = """ - CREATE TABLE metrics ( - id SERIAL PRIMARY KEY, - timestamp bigint, - tasksActive integer, - clusterName varchar(10)); - """.trimIndent() - - try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(CREATE_TABLE) - } catch (e: SQLException){ - println("${e.message}") - } - } - public fun clear(){ val DELETE_ALL_TABLES = """ DROP SCHEMA public CASCADE; CREATE SCHEMA public; """.trimIndent() try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(DELETE_ALL_TABLES) + val st = connection?.createStatement() + st?.executeQuery(DELETE_ALL_TABLES) } catch (e: SQLException){ println("${e.message}") } } + + private fun String.asJdbc(table : String) : String { + return "jdbc:postgresql://$this/$table" + } + } \ No newline at end of file -- cgit v1.2.3