diff options
Diffstat (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt')
| -rw-r--r-- | opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt new file mode 100644 index 00000000..48590d9f --- /dev/null +++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt @@ -0,0 +1,41 @@ +package org.opendc.common.utils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.opendc.common.ProtobufMetrics + +import java.util.* + +public class Kafka ( + private val topic : String, + address : String, + port : Int, +) { + private val servers : String = "$address:$port" + private var properties: Properties? = null + private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null + + init { + properties = Properties() + properties?.put("bootstrap.servers", servers) + properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer") + properties?.put("schema.registry.url", "http://localhost:8081") + properties?.put("auto.register.schemas", "true") + + try { + producer = KafkaProducer(properties) + } catch (e: Exception){ + println("${e.message}") + } + } + + public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? { + return this.producer + } + + public fun send(value : ProtobufMetrics.ProtoExport){ + producer?.send(ProducerRecord(this.topic, value)) + } + +}
\ No newline at end of file |
