Issue
Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I had set a timeout for my consumers, when there are no new messages, the consumer will get closed.
Now, new messages arrive. But, there are not consumers alive to consume them.
How should I handle such scenarios? My consumers may consume all messages and get closed. What is the best way to keep them alive? Is there any way to invoke them automatically upon the arrival of new messages? What are the best practices for such scenarios?
Solution
Why not just
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
I cannot comment on the requirement of "setting a timeout for consumers", but in most of the cases consumers are supposed to run "forever" and should also be added to consumer groups in a way that they are highly available.
Answered By - Giorgos Myrianthous
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.