Issue
df5.writeStream.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").start()
always getting the
Py4JJavaError: An error occurred while calling o112.start.
: java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:311)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Solution
The error message clearly says the reason for the error: Data source jdbc does not support streamed writing
So if you want to write data to JDBC sink from the structured streaming, you need to use foreachBatch instead, something like this:
def foreach_batch_function(df, epoch_id):
df.format("jdbc").option("url", "url")\
.option("dbtable","test").option("user","postgres")\
.option("password", "password").save()
df5.writeStream.foreachBatch(foreach_batch_function).start()
Answered By - Alex Ott
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.