From 2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Mon, 16 Feb 2026 15:18:21 +0100 Subject: feat: opendc -> kafka -> postgresql works; added protobuf encoding --- .../main/kotlin/org/opendc/common/utils/Kafka.kt | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt') diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt new file mode 100644 index 00000000..48590d9f --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -0,0 +1,41 @@ +package org.opendc.common.utils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.opendc.common.ProtobufMetrics + +import java.util.* + +public class Kafka ( + private val topic : String, + address : String, + port : Int, +) { + private val servers : String = "$address:$port" + private var properties: Properties? = null + private var producer: Producer? = null + + init { + properties = Properties() + properties?.put("bootstrap.servers", servers) + properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") + properties?.put("schema.registry.url", "http://localhost:8081") + properties?.put("auto.register.schemas", "true") + + try { + producer = KafkaProducer(properties) + } catch (e: Exception){ + println("${e.message}") + } + } + + public fun getProducer() : Producer? { + return this.producer + } + + public fun send(value : ProtobufMetrics.ProtoExport){ + producer?.send(ProducerRecord(this.topic, value)) + } + +} \ No newline at end of file -- cgit v1.2.3