blob: 48590d9f42171c0bfc495195e419c513d5135965 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package org.opendc.common.utils
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
import java.util.*
public class Kafka (
private val topic : String,
address : String,
port : Int,
) {
private val servers : String = "$address:$port"
private var properties: Properties? = null
private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null
init {
properties = Properties()
properties?.put("bootstrap.servers", servers)
properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
properties?.put("schema.registry.url", "http://localhost:8081")
properties?.put("auto.register.schemas", "true")
try {
producer = KafkaProducer(properties)
} catch (e: Exception){
println("${e.message}")
}
}
public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
return this.producer
}
public fun send(value : ProtobufMetrics.ProtoExport){
producer?.send(ProducerRecord(this.topic, value))
}
}
|