Issue
I am running into intermittent timeout and "Python worker failed to connect back" errors when using mapInPandas, reproduced by the following script. If I run this script several times in succession it will sometimes even alternate between working and failing. Other times it will fail repeatedly.
# reproduce_error.py
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
import pandas as pd
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from typing import Iterator
from pyspark.sql.types import (
StructType, StructField, StringType)
print('environment:')
import platform # only need this for showing environment
print(f'OS: {platform.system()} {platform.release()}')
print('Python:', sys.version)
print('pandas:', pd.__version__)
print('pyspark:', pyspark.__version__)
for k in [
'JAVA_HOME','HADOOP_HOME','SPARK_HOME',
'PYSPARK_PYTHON','PYSPARK_DRIVER_PYTHON','PATH']:
print(f'{k}: {os.environ[k]}')
# start spark
conf = SparkConf().setAll([
('spark.sql.execution.arrow.pyspark.enabled','true'),
('spark.sql.shuffle.partitions','1')
])
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()
# create input
schema = StructType([
StructField('col1', StringType(), True),
StructField('col2', StringType(), True)
])
inpt = spark_session.createDataFrame(
[('A','B'),('C','D')], schema=schema)
# confirm can convert to pandas dataframe as a sanity check
print(inpt.toPandas())
# apply pandas udf that does nothing
def pandas_udf_noop(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf
outpt = inpt.mapInPandas(
pandas_udf_noop, schema=schema)
outpt.show() # error occurs at this line
# tear down
spark_session.stop()
Additional context:
- The above is a minimal example trying to reproduce an error I'm getting in more complex/longer code involving pandas udfs with mapInPandas. The latter was previously reliably working/passing tests for months, same virtual machine. With the real code I am also seeing "Exception: could not open socket: ["tried to connect to ('127.0.0.1', 56838), but an error occurred: [WinError 10061] No connection could be made because the target machine actively refused it"]" but I'm not able to reproduce that with any simple examples I tried--I will leave that for another post but mentioning it in case it gives someone a clue about what's going on.
- I am not running this on my personal computer but rather logging into a virtual machine via RDP at work and running there. There is a firewall and security software running.
- I am using the same environment variables (JAVA_HOME, SPARK_HOME, HADOOP_HOME, PYSPARK_PYTHON, PYSPARK_DRIVER_PYTHON) with which my code worked previously, so they are likely not the essential culprit (although I'm open to changing them).
- Between when my code was last known to work/pass tests and now, IT did some updates on this virtual machine, including installing a newer version of Java, updating the security software, and doing Windows updates. The version of Java I was using before is still there and I am pointing to it with JAVA_HOME.
Here is the output from a time the script above errored:
C:\Users\[me]\Documents\reproduce_spark_udf_error>SET JAVA_HOME=C:\Program Files\Java\jdk1.8.0_211 C:\Users\[me]\Documents\reproduce_spark_udf_error>SET SPARK_HOME=C:\spark\spark-3.1.2-bin-hadoop3.2 C:\Users\[me]\Documents\reproduce_spark_udf_error>SET HADOOP_HOME=C:\hadoop\hadoop-3.2.0 C:\Users\[me]\Documents\reproduce_spark_udf_error>SET PATH=%JAVA_HOME%\bin;%HADOOP_HOME%\bin;%PATH% C:\Users\[me]\Documents\reproduce_spark_udf_error>python reproduce_error.py environment: OS: Windows 10 Python: 3.8.5 (default, Sep 3 2020, 21:29:08) [MSC v.1916 64 bit (AMD64)] pandas: 1.1.3 pyspark: 3.1.2 JAVA_HOME: C:\Program Files\Java\jdk1.8.0_211 HADOOP_HOME: C:\hadoop\hadoop-3.2.0 SPARK_HOME: C:\spark\spark-3.1.2-bin-hadoop3.2 PYSPARK_PYTHON: C:\ProgramData\Anaconda3\python.exe PYSPARK_DRIVER_PYTHON: C:\ProgramData\Anaconda3\python.exe PATH: C:\Program Files\Java\jdk1.8.0_211\bin;C:\hadoop\hadoop-3.2.0\bin;C:\ProgramData\Anaconda3;C:\ProgramData\Anaconda3\Library\mingw-w64\bin;C:\ProgramData\Anaconda3\Library\usr\bin;C:\ProgramData\Anaconda3\Library\bin;C:\ProgramData\Anaconda3\Scripts;C:\Program Files (x86)\Common Files\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files (x86)\Microsoft SQL Server\150\DTS\Binn\;C:\Program Files\TortoiseSVN\bin;C:\msys64\mingw64\bin;C:\Program Files\Microsoft SQL Server\Client SDK\ODBC\130\Tools\Binn\;C:\Program Files (x86)\Microsoft SQL Server\130\Tools\Binn\;C:\Program Files\Microsoft SQL Server\130\Tools\Binn\;C:\Program Files\Microsoft SQL Server\130\DTS\Binn\;C:\Users\[me]\AppData\Local\Microsoft\WindowsApps; Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/07/26 16:11:36 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped col1 col2 0 A B 1 C D 22/07/26 16:13:07 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 8)/ 4] org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.sql.execution.python.MapInPandasExec.$anonfun$doExecute$1(MapInPandasExec.scala:81) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 20 more 22/07/26 16:13:07 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 8) ([vm url] executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.sql.execution.python.MapInPandasExec.$anonfun$doExecute$1(MapInPandasExec.scala:81) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 20 more 22/07/26 16:13:07 ERROR TaskSetManager: Task 1 in stage 2.0 failed 1 times; aborting job Traceback (most recent call last): File "reproduce_error.py", line 48, in <module> outpt.show() (0 + 3) / 4] File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 484, in show print(self._jdf.showString(n, 20, vertical)) File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__ return_value = get_return_value( File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 111, in deco return f(*a, **kw) File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o71.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 8) ([vm url] executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.sql.execution.python.MapInPandasExec.$anonfun$doExecute$1(MapInPandasExec.scala:81) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 20 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) at org.apache.spark.sql.Dataset.head(Dataset.scala:2722) at org.apache.spark.sql.Dataset.take(Dataset.scala:2929) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301) at org.apache.spark.sql.Dataset.showString(Dataset.scala:338) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.sql.execution.python.MapInPandasExec.$anonfun$doExecute$1(MapInPandasExec.scala:81) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 20 more C:\Users\[me]\Documents\reproduce_spark_udf_error>ERROR: The process "2796" not found.
Solution
The issue was caused by the security software we are running.
It seems the "real time file scanning" was either blocking some python workers from launching or slowing things down sufficiently that some python workers failed to start within the hard-coded 10 second window within which the worker must connect back to the socket (see PythonWorkerFactory.scala).
Answered By - A Hood
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.