diff options
Diffstat (limited to 'opendc-common/src/main/kotlin/org/opendc')
4 files changed, 89 insertions, 164 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt deleted file mode 100644 index e6f18da5..00000000 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/ConfigParser.kt +++ /dev/null @@ -1,97 +0,0 @@ -package org.opendc.common.utils - -import kotlinx.serialization.ExperimentalSerializationApi -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.decodeFromStream - -import java.io.File -import java.io.IOException -import java.io.InputStream -import java.io.OutputStream -import java.net.Socket -import java.sql.Connection - -/** - * @author Mateusz - * @property name - * @property backlog the amount of connections to accept - * @property address IPv4 address - * @property port - * @property postgresql Postgresql port - * @property username Postgresql user - * @property password Postgresql password - * @property database Postgresql database - * @property topic Kafka topic - * @property kafka Kafka port - */ -@Serializable -public data class Config( - val name: String = "", - var backlog: Int = 0, - val address: String = "", - val port: Int = 0, - val postgresql: Int = 0, - val username : String = "", - val password : String = "", - val database: String = "", - val topic : String = "", - val kafka: Int = 0, -){ - - public companion object{ - public var input: InputStream? = null - public var output: OutputStream? = null - public var connection : Connection? = null - public var kafka : Kafka? = null - public var database : PostgresqlDB? = null - - public var socket: Socket? = null - - public fun setConfigSocket(socket: Socket?){ - this.socket = socket - try { - input = socket?.getInputStream() - output = socket?.getOutputStream() - } catch (e: IOException){ - print("${e.message}") - } - } - - public fun getConfigReader() : InputStream? { - return input - } - - public fun getConfigWriter() : OutputStream? { - return output - } - - public fun setKafkaInstance(kafka : Kafka) { - this.kafka = kafka - } - - public fun getKafkaInstance() : Kafka? { - return this.kafka - } - - public fun setDB(db : PostgresqlDB){ - this.database = db - } - - public fun getDB() : PostgresqlDB?{ - return this.database - } - } -} -/** - * @author Mateusz - * Reads `config.json` into Config data class. - */ -public class ConfigReader { - private val jsonReader = Json - public fun read(file: File): Config = read(file.inputStream()) - @OptIn(ExperimentalSerializationApi::class) - public fun read(input: InputStream): Config { - return jsonReader.decodeFromStream<Config>(input) - } -} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt new file mode 100644 index 00000000..9db7bfaf --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/JavalinRunner.kt @@ -0,0 +1,25 @@ +package org.opendc.common.utils + +import io.javalin.Javalin +import io.javalin.http.Context +import io.javalin.http.Handler + +public class JavalinRunner { + + private val handleHello: Handler = Handler { ctx -> + ctx.status(200) + ctx.contentType("application/x-protobuf") + ctx.result("Hello world") + } + + init { + val app = Javalin.create().start() + + app.get("/hello", handleHello) + + app.exception<Exception?>(Exception::class.java, { e: Exception?, ctx: Context? -> + e!!.printStackTrace() + ctx!!.status(500) + }) + } +}
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt index 48590d9f..d7ccd385 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -1,41 +1,56 @@ package org.opendc.common.utils + +import com.fasterxml.jackson.dataformat.toml.TomlMapper +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics - import java.util.* +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.toJavaDuration +/** + * Represents the Kafka interface. + * @constructor `topic` the Kafka topic + * @author Mateusz Kwiatkowski + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a> + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a> + */ -public class Kafka ( - private val topic : String, - address : String, - port : Int, -) { - private val servers : String = "$address:$port" - private var properties: Properties? = null - private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null +@Suppress("DEPRECATION") +public class Kafka(private val topic: String) { + private var properties : Properties init { - properties = Properties() - properties?.put("bootstrap.servers", servers) - properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") - properties?.put("schema.registry.url", "http://localhost:8081") - properties?.put("auto.register.schemas", "true") + check(!topic.contains("-")) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(Kafka::class.java.getResource("/producer.toml")) + } - try { - producer = KafkaProducer(properties) - } catch (e: Exception){ + public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit { + val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties) + return fun (value : ProtobufMetrics.ProtoExport) { + try { + producer.send(ProducerRecord(this.topic, value)) + } catch (e: Exception) { println("${e.message}") + } } } - public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? { - return this.producer - } - - public fun send(value : ProtobufMetrics.ProtoExport){ - producer?.send(ProducerRecord(this.topic, value)) + // TODO: fix + public fun getReceive() : () -> Unit { + val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties) + return fun() : Unit { + try { + consumer.subscribe(listOf(topic)) + while (true) { + consumer.poll(1.microseconds.toJavaDuration()) + } + } catch (e: Exception) { + println("${e.message}") + } + } } - }
\ No newline at end of file diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt index 69314ef3..03fd902c 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/PostgresqlDB.kt @@ -1,71 +1,53 @@ package org.opendc.common.utils +import com.fasterxml.jackson.dataformat.toml.TomlMapper import java.sql.Connection import java.sql.DriverManager import java.sql.SQLException - +import java.util.Properties /** * Represents the Postgresql database. - * On setup cleans the entire database and creates empty tables. + * On setup cleans the entire database. * - * @author Mateusz + * @author Mateusz Kwiatkowski * - * @param address ipv4 address - * @param port postgres post - * @param dbName database name - * @param user - * @param password + * @see <a href=https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html> + * https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/DriverManager.html</a> */ -public class PostgresqlDB( - address : String, - port : Int, - dbName : String, - private var user : String, - private var password : String, -) { +@Suppress("DEPRECATION") +public class PostgresqlDB { + private var properties = Properties() private var connection : Connection? = null - private var dbUrl : String = "" init { - dbUrl = "jdbc:postgresql://$address:$port/$dbName" - println(dbUrl) try { - connection = DriverManager.getConnection(dbUrl, user, password) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(PostgresqlDB::class.java.getResource("/database.toml")) + connection = DriverManager.getConnection( + properties.getProperty("address").asJdbc(properties.getProperty("database")), + properties.getProperty("user"), + properties.getProperty("password")) clear() - setup() } catch (e: SQLException) { print("${e.message}") } } - public fun setup(){ - val CREATE_TABLE = """ - CREATE TABLE metrics ( - id SERIAL PRIMARY KEY, - timestamp bigint, - tasksActive integer, - clusterName varchar(10)); - """.trimIndent() - - try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(CREATE_TABLE) - } catch (e: SQLException){ - println("${e.message}") - } - } - public fun clear(){ val DELETE_ALL_TABLES = """ DROP SCHEMA public CASCADE; CREATE SCHEMA public; """.trimIndent() try { - val conn = DriverManager.getConnection(dbUrl, user, password) - val st = conn.createStatement() - st.executeQuery(DELETE_ALL_TABLES) + val st = connection?.createStatement() + st?.executeQuery(DELETE_ALL_TABLES) } catch (e: SQLException){ println("${e.message}") } } + + private fun String.asJdbc(database : String) : String { + return "jdbc:postgresql://$this/$database" + + } + }
\ No newline at end of file |
