Issue
I have two notebooks. First notebook is reading tweets from twitter using tweepy and writing it to a socket. Other notebook is reading tweets from that socket using spark structured streaming (Python) and writing it's result to console. Unfortunately I'm not getting output on jupyter console. Code is working fine on pycharm.
spark = SparkSession \
.builder \
.appName("StructuredStreaming") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 7000) \
.load()
query = tweets \
.writeStream \
.option("truncate", "false") \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
Solution
I am not sure that this is possible with Jupyter Notebook. However, you can use memory output to achieve similar results. This is simple in the complete
mode, but might require some changes for append
.
For the complete
mode
In a complete
output mode, your query should look more or less as follows:
query = tweets \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("your_query_name") \
.start()
Notice that there's no query.awaitTermination()
at the end.
Now, query the your_query_name
temp table in another cell and watch the continuously updated results for as long as you want:
from IPython.display import display, clear_output
while True:
clear_output(wait=True)
display(query.status)
display(spark.sql('SELECT * FROM your_query_name').show())
sleep(1)
For the append
mode
If you'd like to use append
output mode, you have to use watermarks. You also won't be able to use aggregations, so your code might require some further changes.
query = tweets \
.withWatermark("timestampColumn", "3 minutes")
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("your_query_name") \
.start()
The code for display stays the same.
You can also show query.lastProgress
for more detailed information in a similar fashion.
Inspirations and references
- How to get the output from console streaming sink in Zeppelin?
- Overwrite previous output in jupyter notebook
Answered By - Dominik Filipiak
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.