Issue
I experimented with asyncio.gather as follows:
async def some_work(work_name, timeout, raise_exception=False):
"""Do some work"""
print(f"Start {work_name}")
await asyncio.sleep(timeout)
if raise_exception:
raise RuntimeError(f"{work_name} raise an exception")
print(f"Finish {work_name}")
async def main():
try:
await asyncio.gather(
some_work("work1", 3),
some_work("work2", 1),
some_work("work3", 2),
asyncio.gather(
some_work("work4", 3),
some_work("work5", 1, raise_exception=True),
some_work("work6", 2)
)
)
except RuntimeError as error:
print(error)
if __name__ == '__main__':
asyncio.run(main())
At some point, I decided to make a wrapper over asyncio.gather like this:
# Yes I know, concurrently really
def in_parallel(*aws, loop=None, return_exceptions=False):
return asyncio.gather(aws, loop, return_exceptions)
and use it like this:
async def main():
try:
await in_parallel(
some_work("work1", 3),
some_work("work2", 1),
some_work("work3", 2),
in_parallel(
some_work("work4", 3),
some_work("work5", 1, raise_exception=True),
some_work("work6", 2)
)
)
except RuntimeError as error:
print(error)
if __name__ == '__main__':
asyncio.run(main())
And got a bunch of errors:
D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py:34: RuntimeWarning: coroutine 'some_work' was never awaited in_parallel( RuntimeWarning: Enable tracemalloc to get the object allocation traceback Traceback (most recent call last): File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 46, in asyncio.run(main()) File "C:\Program Files\Python38\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 612, in run_until_complete return future.result() File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 34, in main in_parallel( File "D:/Archive/Projects/PycharmProjects/test/asyncio_gather.py", line 14, in in_parallel return asyncio.gather(aws, loop, return_exceptions) File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 806, in gather fut = ensure_future(arg, loop=loop) File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 673, in ensure_future raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' TypeError: An asyncio.Future, a coroutine or an awaitable is required sys:1: RuntimeWarning: coroutine 'some_work' was never awaited
Can anyone explain why? It's just a wrapper!
Solution
Can anyone explain why? It's just a wrapper!
The wrapper has the proper signature, but it's not correctly invoking asyncio.gather
:
def in_parallel(*aws, loop=None, return_exceptions=False):
# XXX incorrect invocation of `gather`
return asyncio.gather(aws, loop, return_exceptions)
asyncio.gather
expects awaitables to be passed as positional arguments, which is how you were invoking it in the first version of your code. As you invoke it from the wrapper, you are always passing it exactly three positional arguments: aws
(containing a tuple of awaitables passed to in_parallel
), loop
(always None
as you invoke it), and return_exceptions
(a boolean). None of those is an actual awaitable, so gather
raises an exception as soon as it tries to do something with the "awaitables" it received, such as converting them to futures.
The correct way to invoke gather
from in_parallel
is to use the *
operator to pass each element of aws
as separate positional argument, and to pass loop
and return_exceptions
as keyword arguments:
def in_parallel(*aws, loop=None, return_exceptions=False):
return asyncio.gather(*aws, loop=loop, return_exceptions=return_exceptions)
With this modification, your code works as expected. Finally, note that explicit loop
argument is deprecated, so you can omit it from your wrapper.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.