package org.opendc.common.utils import com.fasterxml.jackson.dataformat.toml.TomlMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics import java.util.* import kotlin.time.Duration.Companion.microseconds import kotlin.time.toJavaDuration /** * Represents the Kafka interface. * @constructor `topic` the Kafka topic * @author Mateusz Kwiatkowski * @see * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html * @see * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html */ @Suppress("DEPRECATION") public class Kafka(private val topic: String) { private var properties : Properties init { check(!topic.contains("-")) properties = TomlMapper().readerFor(Properties().javaClass) .readValue(Kafka::class.java.getResource("/producer.toml")) } public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit { val producer = KafkaProducer(properties) return fun (value : ProtobufMetrics.ProtoExport) { try { producer.send(ProducerRecord(this.topic, value)) } catch (e: Exception) { println("${e.message}") } } } // TODO: fix public fun getReceive() : () -> Unit { val consumer = KafkaConsumer(properties) return fun() : Unit { try { consumer.subscribe(listOf(topic)) while (true) { consumer.poll(1.microseconds.toJavaDuration()) } } catch (e: Exception) { println("${e.message}") } } } }