summaryrefslogtreecommitdiff
path: root/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
blob: 48590d9f42171c0bfc495195e419c513d5135965 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.opendc.common.utils
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics

import java.util.*

public class Kafka (
    private val topic : String,
    address : String,
    port : Int,
) {
    private val servers : String = "$address:$port"
    private var properties: Properties? = null
    private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null

    init {
        properties = Properties()
        properties?.put("bootstrap.servers", servers)
        properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
        properties?.put("schema.registry.url", "http://localhost:8081")
        properties?.put("auto.register.schemas", "true")

        try {
            producer = KafkaProducer(properties)
        } catch (e: Exception){
            println("${e.message}")
        }
    }

    public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
        return this.producer
    }

    public fun send(value : ProtobufMetrics.ProtoExport){
        producer?.send(ProducerRecord(this.topic, value))
    }

}