blob: 23b8c2c495306e8fbd9351e1bac26ec1c2f14fcc (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import redis
from redis import Redis
from redis_streams.consumer import Consumer
STREAM="postgres_topic"
GROUP="python_consumer"
# It is crucial to enable "decode_response" feature of Redis
redis_conn = Redis(decode_responses=True)
consumer = Consumer(
redis_conn=redis_conn,
stream=STREAM,
consumer_group=GROUP,
batch_size=10,
max_wait_time_ms=30000
)
while True:
messages = consumer.get_items()
total_no_of_messages = len(messages)
for i, item in enumerate(messages):
print(f"Pocessing {i}/{total_no_of_messages} message:{item}")
consumer.remove_item_from_consumer_group(item_id=item.msgid)
|