From ca0c72789ba87791829001d259000b2966d9d6e4 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Fri, 19 Jun 2026 10:57:10 +0200 Subject: feat: added additional capability to HTTPClient --- python_scripts/__init__.py | 0 python_scripts/__main__.py | 19 ++++++++++++++++++ python_scripts/__pycache__/redis.cpython-314.pyc | Bin 0 -> 932 bytes python_scripts/monitor.py | 24 +++++++++++++++++++++++ 4 files changed, 43 insertions(+) create mode 100644 python_scripts/__init__.py create mode 100644 python_scripts/__main__.py create mode 100644 python_scripts/__pycache__/redis.cpython-314.pyc create mode 100644 python_scripts/monitor.py (limited to 'python_scripts') diff --git a/python_scripts/__init__.py b/python_scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python_scripts/__main__.py b/python_scripts/__main__.py new file mode 100644 index 00000000..9c375524 --- /dev/null +++ b/python_scripts/__main__.py @@ -0,0 +1,19 @@ +import uvicorn +from typing import Dict, Any +from fastapi import FastAPI + +PORT = 1234 +HOST = "localhost" +APP = FastAPI() + +@APP.post("/assets") +async def assets(payload: Dict[Any, Any]): + return payload, 200 + + +@APP.get("/check") +async def check(): + return 200 + +if __name__ == "__main__": + uvicorn.run(APP, host=HOST, port=PORT) diff --git a/python_scripts/__pycache__/redis.cpython-314.pyc b/python_scripts/__pycache__/redis.cpython-314.pyc new file mode 100644 index 00000000..5b2aaa57 Binary files /dev/null and b/python_scripts/__pycache__/redis.cpython-314.pyc differ diff --git a/python_scripts/monitor.py b/python_scripts/monitor.py new file mode 100644 index 00000000..23b8c2c4 --- /dev/null +++ b/python_scripts/monitor.py @@ -0,0 +1,24 @@ +import redis +from redis import Redis +from redis_streams.consumer import Consumer + +STREAM="postgres_topic" +GROUP="python_consumer" + +# It is crucial to enable "decode_response" feature of Redis +redis_conn = Redis(decode_responses=True) +consumer = Consumer( + redis_conn=redis_conn, + stream=STREAM, + consumer_group=GROUP, + batch_size=10, + max_wait_time_ms=30000 + ) + + +while True: + messages = consumer.get_items() + total_no_of_messages = len(messages) + for i, item in enumerate(messages): + print(f"Pocessing {i}/{total_no_of_messages} message:{item}") + consumer.remove_item_from_consumer_group(item_id=item.msgid) -- cgit v1.2.3