package org.opendc.common.utils import com.fasterxml.jackson.dataformat.toml.TomlMapper import redis.clients.jedis.RedisClient import redis.clients.jedis.StreamEntryID import redis.clients.jedis.params.XReadParams import java.util.Map import java.util.Properties /** * This class represents the Redis server instance. * @author Mateusz Kwiatkowski * @see https://redis.io/docs/latest/ * * @see https://redis.io/docs/latest/develop/data-types/streams/ * * @see https://www.javadoc.io/doc/redis.clients/jedis/latest/index.html */ @Suppress("DEPRECATION") public class Redis private constructor() { private var jedis : RedisClient? = null private var properties : Properties? = null init { properties = TomlMapper().readerFor(Properties().javaClass).readValue(Kafka::class.java.getResource("/subscriber.toml")) jedis = RedisClient.create("redis://localhost:${properties?.getProperty("port")}") } public companion object { private var instance: Redis? = null public fun getInstance(): Redis? { if (instance == null) { instance = Redis() } return instance } } public fun readOne() { while(true) { val messages = jedis?.xread( XReadParams.xReadParams(), Map.of("${properties?.getProperty("table")}", StreamEntryID() )); if (messages != null) { for (stream in messages) { for (entry in stream.value) { if(entry.getFields().get("downtime") != null) { entry.getFields().get("downtime")?.toDouble()?.let { if (it > 0) { println("ID: " + entry.getID()) } } } } } } } // https://redis.io/docs/latest/develop/use-cases/streaming/java-jedis/ // In Redis you can subscribe to updates to a stream. // You should base your application off this. // You can listen for new items with XREAD // do not close for now //jedis.close() } }