Issue
i want to send this simple pyspark dataframe to kafka and i done everything but i always get an error. i try it with simple python producer script and it works and the pyspark read stream work my only problem is with the write to kafka using pyspark. can anyone help me please.
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
# Create a SparkSession
spark = SparkSession.builder \
.appName("KafkaSinkExample") \
.getOrCreate()
# Sample data
data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
df = spark.createDataFrame(data, ["key", "value"])
# Write data to Kafka
(df.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:29092") \
.option("topic", "test") \
.save())
This is my docker-compose file:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
pyspark:
image: jupyter/pyspark-notebook:latest
container_name: pyspark
ports:
- "8888:8888"
environment:
- PYSPARK_PYTHON=python3
- PYSPARK_DRIVER_PYTHON=python3
- SPARK_HOME=/usr/local/spark
volumes:
- ./notebooks:/home/jovyan/work # Mount your notebooks folder or adjust as needed
the Error that i get:
23/12/25 16:50:56 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last): (0 + 1) / 2]
File "/home/jovyan/preprocessing/bing.py", line 23, in <module>
.save())
^^^^^^
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
self._jwrite.save()
File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (0f08baeba383 executor driver): java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$4()'
Solution
i had the same problem if you use the latest pyspark version 3.5.0 you should edit your package to
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
Answered By - Waleed saeed
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.