blob: d7ccd38552e5b977c1f0b2442887f3e433033b37 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package org.opendc.common.utils
import com.fasterxml.jackson.dataformat.toml.TomlMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
import java.util.*
import kotlin.time.Duration.Companion.microseconds
import kotlin.time.toJavaDuration
/**
* Represents the Kafka interface.
* @constructor `topic` the Kafka topic
* @author Mateusz Kwiatkowski
* @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
* https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a>
* @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
* https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
*/
@Suppress("DEPRECATION")
public class Kafka(private val topic: String) {
private var properties : Properties
init {
check(!topic.contains("-"))
properties = TomlMapper().readerFor(Properties().javaClass)
.readValue(Kafka::class.java.getResource("/producer.toml"))
}
public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties)
return fun (value : ProtobufMetrics.ProtoExport) {
try {
producer.send(ProducerRecord(this.topic, value))
} catch (e: Exception) {
println("${e.message}")
}
}
}
// TODO: fix
public fun getReceive() : () -> Unit {
val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
return fun() : Unit {
try {
consumer.subscribe(listOf(topic))
while (true) {
consumer.poll(1.microseconds.toJavaDuration())
}
} catch (e: Exception) {
println("${e.message}")
}
}
}
}
|