Issue
I set up a 3 nodes Kafka cluster with docker-compose, I then created 5 topics with 3 partitions and replication factor of 3. I set the producers to be connected to the port of each node.
Messages go from one place to another in order (as it should), but I realised after checking my cluster with an UI that all the messages of all topics are going to the same partition (partition #2).
At first, I thought that it might have to do with not having set any partition key for the messages, so I modified my script to add a partition key to every message (a combination of the first two letters of the topic and the id number of the tweet, does this partition key format make any sense though?) but the problem persists.
This is the code (it receives tweets from the Twitter API v2 and send messages with the producer):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2].upper()}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
I thought that without any given key it would start sending the messages to the three partitions randomly but it didn't do that either. I don't know where the problem might be.
In case it might be relevant all the 5 topics were created in docker compose using this format:
docker-compose exec kafka1 kafka-topics --bootstrap-server kafka1:19092 --create --replication-factor 3 --partitions 3 --topic NoFlyZone
Solution
It should send to multiple partitions if no key is given. If you give a key, then you run the risk that the same partition hash is computed, even if you have differing keys.
You may want to test with other libraries such as kafka-python
or confluent-kafka-python
since PyKafka
is no longer maintained
Answered By - OneCricketeer
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.