summaryrefslogtreecommitdiff
path: root/opendc-common/src/main/kotlin
diff options
context:
space:
mode:
authormjkwiatkowski <mati.rewa@gmail.com>2026-02-20 16:17:39 +0100
committermjkwiatkowski <mati.rewa@gmail.com>2026-02-20 16:17:39 +0100
commitf5da60e4275ca1172128c3994298691e12d5e1f8 (patch)
tree189804251bf88bf390e1c9ffb4472b7a798d7f22 /opendc-common/src/main/kotlin
parent2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (diff)
fix: changed the syntex to slowly get rid of the Config classHEADmaster
Diffstat (limited to 'opendc-common/src/main/kotlin')
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt13
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt67
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt63
3 files changed, 74 insertions, 69 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
index e6f18da5..8261f6f0 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt
@@ -13,7 +13,6 @@ import java.net.Socket
import java.sql.Connection
/**
- * @author Mateusz
* @property name
* @property backlog the amount of connections to accept
* @property address IPv4 address
@@ -22,8 +21,14 @@ import java.sql.Connection
* @property username Postgresql user
* @property password Postgresql password
* @property database Postgresql database
- * @property topic Kafka topic
+ * @property topic Kafka topic and database table name
* @property kafka Kafka port
+ * @author Mateusz
+ */
+/*
+
+ Use `by lazy` here.
+ Use design patterns - singleton.
*/
@Serializable
public data class Config(
@@ -50,6 +55,10 @@ public data class Config(
public fun setConfigSocket(socket: Socket?){
this.socket = socket
+ // no try catch if the exception is not from Java
+ // do not use raw sockets, use a service for the communication
+ // use redis instead of HTTP GET (consider it, but not bound in stone)
+ // make an API KTor
try {
input = socket?.getInputStream()
output = socket?.getOutputStream()
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
index 48590d9f..d7ccd385 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
@@ -1,41 +1,56 @@
package org.opendc.common.utils
+
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
-
import java.util.*
+import kotlin.time.Duration.Companion.microseconds
+import kotlin.time.toJavaDuration
+/**
+ * Represents the Kafka interface.
+ * @constructor `topic` the Kafka topic
+ * @author Mateusz Kwiatkowski
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a>
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
+ */
-public class Kafka (
- private val topic : String,
- address : String,
- port : Int,
-) {
- private val servers : String = "$address:$port"
- private var properties: Properties? = null
- private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null
+@Suppress("DEPRECATION")
+public class Kafka(private val topic: String) {
+ private var properties : Properties
init {
- properties = Properties()
- properties?.put("bootstrap.servers", servers)
- properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
- properties?.put("schema.registry.url", "http://localhost:8081")
- properties?.put("auto.register.schemas", "true")
+ check(!topic.contains("-"))
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(Kafka::class.java.getResource("/producer.toml"))
+ }
- try {
- producer = KafkaProducer(properties)
- } catch (e: Exception){
+ public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
+ val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun (value : ProtobufMetrics.ProtoExport) {
+ try {
+ producer.send(ProducerRecord(this.topic, value))
+ } catch (e: Exception) {
println("${e.message}")
+ }
}
}
- public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
- return this.producer
- }
-
- public fun send(value : ProtobufMetrics.ProtoExport){
- producer?.send(ProducerRecord(this.topic, value))
+ // TODO: fix
+ public fun getReceive() : () -> Unit {
+ val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun() : Unit {
+ try {
+ consumer.subscribe(listOf(topic))
+ while (true) {
+ consumer.poll(1.microseconds.toJavaDuration())
+ }
+ } catch (e: Exception) {
+ println("${e.message}")
+ }
+ }
}
-
} \ No newline at end of file
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
index 69314ef3..361925ee 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt
@@ -1,71 +1,52 @@
package org.opendc.common.utils
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
-
+import java.util.Properties
/**
* Represents the Postgresql database.
- * On setup cleans the entire database and creates empty tables.
+ * On setup cleans the entire database.
*
- * @author Mateusz
+ * @author Mateusz Kwiatkowski
*
- * @param address ipv4 address
- * @param port postgres post
- * @param dbName database name
- * @param user
- * @param password
+ * @see <a href=https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html>
+ * https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html</a>
*/
-public class PostgresqlDB(
- address : String,
- port : Int,
- dbName : String,
- private var user : String,
- private var password : String,
-) {
+@Suppress("DEPRECATION")
+public class PostgresqlDB {
+ private var properties = Properties()
private var connection : Connection? = null
- private var dbUrl : String = ""
init {
- dbUrl = "jdbc:postgresql://$address:$port/$dbName"
- println(dbUrl)
try {
- connection = DriverManager.getConnection(dbUrl, user, password)
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(PostgresqlDB::class.java.getResource("/database.toml"))
+ connection = DriverManager.getConnection(
+ properties.getProperty("address").asJdbc(properties.getProperty("table")),
+ properties.getProperty("user"),
+ properties.getProperty("password"))
clear()
- setup()
} catch (e: SQLException) {
print("${e.message}")
}
}
- public fun setup(){
- val CREATE_TABLE = """
- CREATE TABLE metrics (
- id SERIAL PRIMARY KEY,
- timestamp bigint,
- tasksActive integer,
- clusterName varchar(10));
- """.trimIndent()
-
- try {
- val conn = DriverManager.getConnection(dbUrl, user, password)
- val st = conn.createStatement()
- st.executeQuery(CREATE_TABLE)
- } catch (e: SQLException){
- println("${e.message}")
- }
- }
-
public fun clear(){
val DELETE_ALL_TABLES = """
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
""".trimIndent()
try {
- val conn = DriverManager.getConnection(dbUrl, user, password)
- val st = conn.createStatement()
- st.executeQuery(DELETE_ALL_TABLES)
+ val st = connection?.createStatement()
+ st?.executeQuery(DELETE_ALL_TABLES)
} catch (e: SQLException){
println("${e.message}")
}
}
+
+ private fun String.asJdbc(table : String) : String {
+ return "jdbc:postgresql://$this/$table"
+ }
+
} \ No newline at end of file