Issue
I have zero knowledge of asynchronous python other than several hours of searching stackoverflow posts, and am struggling to figure out what is, occasionally, causing the below error:
Triggerer's async thread was blocked for 0.26 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines
Setting PYTHONASYNCIODEBUG=1 is not giving me anything I am capable of understanding:
WARNING - Executing <Task pending name='Task-3' coro=<TriggerRunner.run_trigger() running at /home/xxx/.local/lib/python3.8/site-packages/airflow/jobs/triggerer_job.py:358> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2ded110e50>()] created at /usr/lib/python3.8/asyncio/base_events.py:422> created at /usr/lib/python3.8/asyncio/tasks.py:382> took 0.441 seconds
I think the blocking is happening in below code snippet.
async def run(self):
while True:
# ref: https://www.geeksforgeeks.org/how-to-use-threadpoolexecutor-in-python3/
with ThreadPoolExecutor(max_workers=5) as exe:
future = exe.submit(self._get_records)
records = future.result()
if records:
yield TriggerEvent(True)
await asyncio.sleep(self.poke_interval)
def _get_records(self):
return self.hook.get_records(self.sql)
For further reference, complete code as well as the intent of what I am trying to accomplish is here: How to write deferrable SqlSensor in Airflow?
Thanks in advance for any advice on debugging / fixing this issue.
Solution
The code, as you written it will offload the blocking part to be executed in another thread - which is ok, and then immediately block the main thread until the work is complete in that other thread, by calling future.result()
: you are using concurrent.futures
with no provision to take any advantage of the asyncio in the main thread. The hint to find out this call is blocking is that there is no await
in front of it.
You can do this instead:
async def run(self):
exe = ThreadPoolExecutor(max_workers=5)
loop = asyncio.get_event_loop()
with exe:
while True:
records = await loop.run_in_executor(exe, self._get_records)
if records:
yield TriggerEvent(True)
await asyncio.sleep(self.poke_interval)
(Also in your code you were creating and shutting down the executor - with its threads - in each interation of the outer while
loop)
This, as is will offload each call to a different thread, but still won 't do any calls in parallel - if you want up to 5 calls in parallel, please make that explicit, along with the limit requirement you are trying to implement with the asyncio.sleep
call - possibily in another question.
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.