package org.opendc.common.utils
import com.fasterxml.jackson.dataformat.toml.TomlMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
import java.util.*
import kotlin.time.Duration.Companion.microseconds
import kotlin.time.toJavaDuration
/**
* Represents the Kafka interface.
* @constructor `topic` the Kafka topic
* @author Mateusz Kwiatkowski
* @see
* https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* @see
* https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
*/
@Suppress("DEPRECATION")
public class Kafka(private val topic: String) {
private var properties : Properties
init {
check(!topic.contains("-"))
properties = TomlMapper().readerFor(Properties().javaClass)
.readValue(Kafka::class.java.getResource("/producer.toml"))
}
public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
val producer = KafkaProducer(properties)
return fun (value : ProtobufMetrics.ProtoExport) {
try {
producer.send(ProducerRecord(this.topic, value))
} catch (e: Exception) {
println("${e.message}")
}
}
}
// TODO: fix
public fun getReceive() : () -> Unit {
val consumer = KafkaConsumer(properties)
return fun() : Unit {
try {
consumer.subscribe(listOf(topic))
while (true) {
consumer.poll(1.microseconds.toJavaDuration())
}
} catch (e: Exception) {
println("${e.message}")
}
}
}
}