Issue
Use following code as an example. The main idea is having one thread running the sync code and submit an async task to the main thread (the event loop).
import asyncio
import concurrent.futures
from time import sleep
loop = asyncio.get_event_loop()
async def a():
print('a')
# simulate IO-bound async code
await asyncio.sleep(0.1)
print('b')
def c():
print('c')
x = loop.create_task(a())
# simulate CPU-bound code
sleep(1)
print('d')
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
future = loop.run_in_executor(executor, c)
loop.run_forever()
I expect to see output of
c
a
[wait 0.1 second]
b
[wait 0.9 second]
d
but I actually see
c
[wait 1 second]
d
a
[wait 0.1 second]
b
Based on my understanding the main thread/loop shouldn't block by the run_in_executor future and should start run the async function upon submitted.
Tested on Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Any one can help explain the behavior? Is there any way I can trigger async code from sync code?
Thanks!
Solution
The problem is that create_task
is not thread_safe: Despite no corrupt data taking place, because the sets, dicts and other structures used internally by asyncio are guarded by GIL, the loop does not expect for new things for it to do to show up when it was just awaiting for its solely thread-safe task, which was the run_in_executor
call.
Using the thread-safe calls provided in asyncio will do the correct things internally. This code works as you expect:
import asyncio
import concurrent.futures
from time import sleep, time
_ellapsed_counter = time()
def ellapsed():
global _ellapsed_counter
tmp = time() - _ellapsed_counter
_ellapsed_counter = time()
return f"[ellapsed {tmp:.02f}s]\n"
loop = asyncio.get_event_loop()
async def a():
print(ellapsed(), 'a')
# simulate IO-bound async code
await asyncio.sleep(0.1)
print(ellapsed(), 'b')
async def helper():
loop.create_task(a())
def c():
print(ellapsed(), 'c')
x = asyncio.run_coroutine_threadsafe(helper(), loop)
# simulate CPU-bound code
sleep(1)
print(ellapsed(), 'd')
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
future = loop.run_in_executor(executor, c)
loop.run_forevever()
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.