Issue
I'm trying to figure out a way to limit the number of concurrent HTTP requests made to a server using Python's asyncio and httpx module. I came across this StackOverflow answer.
It proposes asyncio.Semaphore
for stopping multiple consumers from making too many requests. While this answer works perfectly, it uses explicit loop construction, not asyncio.run
. When I replace the explicit loop construction with asyncio.run
, the behavior of the code changes. Instead of doing all 9 requests, now it just executes three requests and then stops.
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks, return_exceptions=True) # await moment all downloads done
if __name__ == '__main__':
asyncio.run(main())
This prints out:
downloading 0 will take 3 second(s)
downloading 1 will take 1 second(s)
downloading 2 will take 3 second(s)
downloaded 1
downloaded 0
downloaded 2
I had to change await asyncio.gather(*tasks)
to await asyncio.gather(*tasks, return_exceptions=True)
so that the code doesn't throw a RuntimeError
. Otherwise it'd throw this error, I've got asyncio debug mode turned on.
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
Traceback (most recent call last):
File "/home/rednafi/workspace/personal/demo/demo.py", line 66, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/rednafi/workspace/personal/demo/demo.py", line 62, in main
await asyncio.gather(*tasks) # await moment all downloads done
File "/home/rednafi/workspace/personal/demo/demo.py", line 52, in safe_download
async with sem: # semaphore limits num of simultaneous downloads
File "/usr/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
await self.acquire()
File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='Task-5' coro=<safe_download() running at /home/rednafi/workspace/personal/demo/demo.py:52> cb=[gather.<locals>._done_callback() at /usr/lib/python3.9/asyncio/tasks.py:764] created at /home/rednafi/workspace/personal/demo/demo.py:58> got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
However, the only other change is replacing the explicit loop with asyncio.run
.
The question is why the behavior of the code changed? And how can I bring back the old, expected behavior?
Solution
The problem is that the Semaphore
created at top-level caches the event loop active during its creation (an event loop automatically created by asyncio and returned by get_event_loop()
at startup). asyncio.run()
on the other hand creates a fresh event loop on each run. As a result you're trying to await a semaphore from a different event loop, which fails. As always, hiding the exception without understanding its cause only leads to further issues down the line.
To fix the issue properly, you should create the semaphore while inside asyncio.run()
. For example, the simplest fix can look like this:
# ...
sem = None
async def main():
global sem
sem = asyncio.Semaphore(3)
# ...
A more elegant approach is to completely remove sem
from top-level and explicitly pass it to safe_download
:
async def safe_download(i, limit):
async with limit:
return await download(i)
async def main():
# limit parallel downloads to 3 at most
limit = asyncio.Semaphore(3)
# you don't need to explicitly call create_task() if you call
# `gather()` because `gather()` will do it for you
await asyncio.gather(*[safe_download(i, limit) for i in range(9)])
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.