summaryrefslogtreecommitdiff
path: root/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
blob: d7ccd38552e5b977c1f0b2442887f3e433033b37 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.opendc.common.utils

import com.fasterxml.jackson.dataformat.toml.TomlMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
import java.util.*
import kotlin.time.Duration.Companion.microseconds
import kotlin.time.toJavaDuration
/**
 * Represents the Kafka interface.
 * @constructor `topic` the Kafka topic
 * @author Mateusz Kwiatkowski
 * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
 *     https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a>
 * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
 *      https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
 */

@Suppress("DEPRECATION")
public class Kafka(private val topic: String) {
    private var properties : Properties

    init {
        check(!topic.contains("-"))
        properties  = TomlMapper().readerFor(Properties().javaClass)
            .readValue(Kafka::class.java.getResource("/producer.toml"))
    }

    public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
        val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties)
        return fun (value : ProtobufMetrics.ProtoExport) {
            try {
                producer.send(ProducerRecord(this.topic, value))
            } catch (e: Exception) {
            println("${e.message}")
            }
        }
    }

    // TODO: fix
    public fun getReceive() : () -> Unit {
        val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
        return fun() : Unit {
            try {
                consumer.subscribe(listOf(topic))
                while (true) {
                    consumer.poll(1.microseconds.toJavaDuration())
                }
            } catch (e: Exception) {
                println("${e.message}")
            }
        }
    }
}