package org.opendc.common.utils import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.opendc.common.ProtobufMetrics import java.util.* public class Kafka ( private val topic : String, address : String, port : Int, ) { private val servers : String = "$address:$port" private var properties: Properties? = null private var producer: Producer? = null init { properties = Properties() properties?.put("bootstrap.servers", servers) properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") properties?.put("schema.registry.url", "http://localhost:8081") properties?.put("auto.register.schemas", "true") try { producer = KafkaProducer(properties) } catch (e: Exception){ println("${e.message}") } } public fun getProducer() : Producer? { return this.producer } public fun send(value : ProtobufMetrics.ProtoExport){ producer?.send(ProducerRecord(this.topic, value)) } }