summaryrefslogtreecommitdiff
path: root/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt')
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt41
1 files changed, 41 insertions, 0 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
new file mode 100644
index 00000000..48590d9f
--- /dev/null
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
@@ -0,0 +1,41 @@
+package org.opendc.common.utils
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.Producer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.opendc.common.ProtobufMetrics
+
+import java.util.*
+
+public class Kafka (
+ private val topic : String,
+ address : String,
+ port : Int,
+) {
+ private val servers : String = "$address:$port"
+ private var properties: Properties? = null
+ private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null
+
+ init {
+ properties = Properties()
+ properties?.put("bootstrap.servers", servers)
+ properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+ properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
+ properties?.put("schema.registry.url", "http://localhost:8081")
+ properties?.put("auto.register.schemas", "true")
+
+ try {
+ producer = KafkaProducer(properties)
+ } catch (e: Exception){
+ println("${e.message}")
+ }
+ }
+
+ public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
+ return this.producer
+ }
+
+ public fun send(value : ProtobufMetrics.ProtoExport){
+ producer?.send(ProducerRecord(this.topic, value))
+ }
+
+} \ No newline at end of file