Issue
I have synchronous methods for uploading and downloading files (represented by sync_wait
method). I want to do that asynchronously and endlessly in a way where I have parallel executions of uploaders and downloaders at the same time forever. I achieved this this way:
Running this code:
import time, asyncio
from functools import wraps, partial
# https://stackoverflow.com/a/50450553/3026886
def to_async(func):
@wraps(func)
async def run(*args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(None, partial(func, *args, **kwargs))
return run
@to_async
def sync_wait(msg):
time.sleep(msg)
async def producer(n, queue):
while True:
msg = .2
await sync_wait(msg)
print(f'{n}p')
await queue.put(msg)
async def consumer(n, queue):
while True:
msg = await queue.get()
print(f'{n}c')
await sync_wait(msg)
async def main():
queue = queue = asyncio.Queue(10)
producers = [producer(n, queue) for n in range(2)]
consumers = [consumer(n, queue) for n in range(4)]
await asyncio.gather(*(producers + consumers), return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())
Printed this output:
1p
0p
0c
1c
1p
2c
0p
3c
1p
2c
0p
1c
1p
0c
0p
3c
1p
2c
0p
3c
1p
0c
0p
3c
...
which makes sense since I have 2 producers and 4 consumers interacting with my queue. My boss told me I didn't need the to_async
decorator. But after removing only the decorator from sync_wait
definiton, I got no prints at all. How can i explain this new behavior?
Solution
When you await
something that cannot be awaited, things crash:
# python3 -m asyncio
asyncio REPL 3.9.9 (main, Jan 10 2022, 11:05:09)
[Clang 10.0.1 (clang-1001.0.46.4)] on darwin
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>> await time.sleep(.5) # sleeps .5 secs before failing to await
Traceback (most recent call last):
...
TypeError: object NoneType can't be used in 'await' expression
However, if you put failing things into a Task
or gather
them, they just fail silently until you actually await them to retrieve their result.
>>> async def fail():
... print("about to fail...")
... print(1/0)
...
>>> t = asyncio.create_task(fail())
about to fail...
>>> await t # only fails noticeably when retrieving result
Traceback (most recent call last):
...
ZeroDivisionError: division by zero
Consequently, if you put failing tasks into a gather(..., return_exceptions=True)
that has an infinitely running, non-failing task, failures are never reported.
Answered By - MisterMiyagi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.