summaryrefslogtreecommitdiff
path: root/python_scripts/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_scripts/monitor.py')
-rw-r--r--python_scripts/monitor.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/python_scripts/monitor.py b/python_scripts/monitor.py
new file mode 100644
index 00000000..23b8c2c4
--- /dev/null
+++ b/python_scripts/monitor.py
@@ -0,0 +1,24 @@
+import redis
+from redis import Redis
+from redis_streams.consumer import Consumer
+
+STREAM="postgres_topic"
+GROUP="python_consumer"
+
+# It is crucial to enable "decode_response" feature of Redis
+redis_conn = Redis(decode_responses=True)
+consumer = Consumer(
+ redis_conn=redis_conn,
+ stream=STREAM,
+ consumer_group=GROUP,
+ batch_size=10,
+ max_wait_time_ms=30000
+ )
+
+
+while True:
+ messages = consumer.get_items()
+ total_no_of_messages = len(messages)
+ for i, item in enumerate(messages):
+ print(f"Pocessing {i}/{total_no_of_messages} message:{item}")
+ consumer.remove_item_from_consumer_group(item_id=item.msgid)