summaryrefslogtreecommitdiff
path: root/python_scripts/monitor.py
blob: 23b8c2c495306e8fbd9351e1bac26ec1c2f14fcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis
from redis import Redis
from redis_streams.consumer import Consumer

STREAM="postgres_topic"
GROUP="python_consumer"

# It is crucial to enable "decode_response" feature of Redis
redis_conn = Redis(decode_responses=True)
consumer = Consumer(
        redis_conn=redis_conn,
        stream=STREAM,
        consumer_group=GROUP,
        batch_size=10,
        max_wait_time_ms=30000
    )


while True:
    messages = consumer.get_items()
    total_no_of_messages = len(messages)
    for i, item in enumerate(messages):
        print(f"Pocessing {i}/{total_no_of_messages} message:{item}")
        consumer.remove_item_from_consumer_group(item_id=item.msgid)