summaryrefslogtreecommitdiff
path: root/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt')
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt67
1 files changed, 41 insertions, 26 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
index 48590d9f..d7ccd385 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/utils/Kafka.kt
@@ -1,41 +1,56 @@
package org.opendc.common.utils
+
+import com.fasterxml.jackson.dataformat.toml.TomlMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.opendc.common.ProtobufMetrics
-
import java.util.*
+import kotlin.time.Duration.Companion.microseconds
+import kotlin.time.toJavaDuration
+/**
+ * Represents the Kafka interface.
+ * @constructor `topic` the Kafka topic
+ * @author Mateusz Kwiatkowski
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html</a>
+ * @see <a href=https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html>
+ * https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
+ */
-public class Kafka (
- private val topic : String,
- address : String,
- port : Int,
-) {
- private val servers : String = "$address:$port"
- private var properties: Properties? = null
- private var producer: Producer<String, ProtobufMetrics.ProtoExport>? = null
+@Suppress("DEPRECATION")
+public class Kafka(private val topic: String) {
+ private var properties : Properties
init {
- properties = Properties()
- properties?.put("bootstrap.servers", servers)
- properties?.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
- properties?.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
- properties?.put("schema.registry.url", "http://localhost:8081")
- properties?.put("auto.register.schemas", "true")
+ check(!topic.contains("-"))
+ properties = TomlMapper().readerFor(Properties().javaClass)
+ .readValue(Kafka::class.java.getResource("/producer.toml"))
+ }
- try {
- producer = KafkaProducer(properties)
- } catch (e: Exception){
+ public fun getSend() : (ProtobufMetrics.ProtoExport) -> Unit {
+ val producer = KafkaProducer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun (value : ProtobufMetrics.ProtoExport) {
+ try {
+ producer.send(ProducerRecord(this.topic, value))
+ } catch (e: Exception) {
println("${e.message}")
+ }
}
}
- public fun getProducer() : Producer<String, ProtobufMetrics.ProtoExport>? {
- return this.producer
- }
-
- public fun send(value : ProtobufMetrics.ProtoExport){
- producer?.send(ProducerRecord(this.topic, value))
+ // TODO: fix
+ public fun getReceive() : () -> Unit {
+ val consumer = KafkaConsumer<String, ProtobufMetrics.ProtoExport>(properties)
+ return fun() : Unit {
+ try {
+ consumer.subscribe(listOf(topic))
+ while (true) {
+ consumer.poll(1.microseconds.toJavaDuration())
+ }
+ } catch (e: Exception) {
+ println("${e.message}")
+ }
+ }
}
-
} \ No newline at end of file