diff options
| author | mjkwiatkowski <mati.rewa@gmail.com> | 2026-02-20 16:17:39 +0100 |
|---|---|---|
| committer | mjkwiatkowski <mati.rewa@gmail.com> | 2026-02-20 16:17:39 +0100 |
| commit | f5da60e4275ca1172128c3994298691e12d5e1f8 (patch) | |
| tree | 189804251bf88bf390e1c9ffb4472b7a798d7f22 /opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt | |
| parent | 2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 (diff) | |
Diffstat (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt')
| -rw-r--r-- | opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt | 67 |
1 files changed, 41 insertions, 26 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt index 48590d9f..d7ccd385 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -1,41 +1,56 @@ package org.opendc.common.utils + +import com.fasterxml.jackson.dataformat.toml.TomlMapper +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics - import java.util.* +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.toJavaDuration +/** + * Represents the Kafka interface. + * @constructor `topic` the Kafka topic + * @author Mateusz Kwiatkowski + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a> + * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html> + * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a> + */ -public class Kafka ( - private val topic : String, - address : String, - port : Int, -) { - private val servers : String = "$address:$port" - private var properties: Properties? = null - private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null +@Suppress("DEPRECATION") +public class Kafka(private val topic: String) { + private var properties : Properties init { - properties = Properties() - properties?.put("bootstrap.servers", servers) - properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") - properties?.put("schema.registry.url", "http://localhost:8081") - properties?.put("auto.register.schemas", "true") + check(!topic.contains("-")) + properties = TomlMapper().readerFor(Properties().javaClass) + .readValue(Kafka::class.java.getResource("/producer.toml")) + } - try { - producer = KafkaProducer(properties) - } catch (e: Exception){ + public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit { + val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties) + return fun (value : ProtobufMetrics.ProtoExport) { + try { + producer.send(ProducerRecord(this.topic, value)) + } catch (e: Exception) { println("${e.message}") + } } } - public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? { - return this.producer - } - - public fun send(value : ProtobufMetrics.ProtoExport){ - producer?.send(ProducerRecord(this.topic, value)) + // TODO: fix + public fun getReceive() : () -> Unit { + val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties) + return fun() : Unit { + try { + consumer.subscribe(listOf(topic)) + while (true) { + consumer.poll(1.microseconds.toJavaDuration()) + } + } catch (e: Exception) { + println("${e.message}") + } + } } - }
\ No newline at end of file |
