Issue
Currently I am trying to consume from a topic and appending all messages which comes out from a topic, but unfortunately the list in the end doesn't print anything.
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list=[]
try:
for message in consumer:
data = message.value
data_list.append(data)
consumer.commit()
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
Output as below
<kafka.consumer.group.KafkaConsumer object at 0x7f1c3379c9d0>
Any comments?
Solution
Code seems Okay. Could you please add some logging or print statements within the loop like below to see if any exceptions are being raised. This can help you identify if there are any issues during message consumption.
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list = []
try:
for message in consumer:
data = message.value
data_list.append(data)
print(f"Received message: {data}")
consumer.commit()
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
-----------UPDATE---------
Try below logic: the loop will break after consuming the specified number of messages (max_messages)
. This way, you should be able to see the printed output outside the loop once the loop terminates. Adjust the condition according
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list = []
max_messages = 100 # Set a limit on the number of messages to consume
try:
for message in consumer:
data = message.value
data_list.append(data)
print(f"Received message: {data}")
consumer.commit()
# Break out of the loop after consuming a certain number of messages
if len(data_list) >= max_messages:
break
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
Answered By - TheHungryCub
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.