Issue
I have a simple class that leverages an async generator to retrieve a list of URLs:
import aiohttp
import asyncio
import logging
import sys
LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3
FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
class ASYNC_GENERATOR(object):
def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
self.loop = loop
self.semaphore = asyncio.Semaphore(n_semaphore)
self.session = aiohttp.ClientSession(loop=self.loop)
async def _get_url(self, url):
"""
Sends an http GET request to an API endpoint
"""
async with self.semaphore:
async with self.session.get(url) as response:
logger.info(f'Request URL: {url} [{response.status}]')
read_response = await response.read()
return {
'read': read_response,
'status': response.status,
}
def get_routes(self, urls):
"""
Wrapper around _get_url (multiple urls asynchronously)
This returns an async generator
"""
# Asynchronous http GET requests
coros = [self._get_url(url) for url in urls]
futures = asyncio.as_completed(coros)
for future in futures:
yield self.loop.run_until_complete(future)
def close(self):
self.session._connector.close()
When I execute this main part of the code:
if __name__ == '__main__':
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
for response in responses:
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
The log prints out:
[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]
Since responses
is an async generator, I expect it to yield one response from the async generator (which should only send the request upon actually yielding), send a separate request to the endpoint with no x
parameter, and then yield the next response from the async generator. This should flip back and forth between a request with an x
parameter and a request with no parameters. Instead, it is yielding all responses from the async generator with an x
parameter and then followed by all of the https requests that have no parameters.
Something similar happens when I do:
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
And the log prints:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
Instead, what I want is:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
There are times when I want to retrieve all of the responses first before doing anything else. However, there are also times when I want to interject and make intermediate requests before yielding the next item from the generator (i.e., the generator returns results from paginated search results and I want to process further links from each page before moving onto the next page).
What do I need to change to achieve the required result?
Solution
Leaving aside the technical question of whether responses
is an async generator (it's not, as Python uses the term), your problem lies in as_completed
. as_completed
starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed
works on thread-based futures which revolve around parallel execution. Conceptually, the same is true of asyncio futures.
Your code obtains only the first (fastest-arriving) result and then start doing something else, also using asyncio. The remaining coroutines passed to as_completed
are not frozen up merely because no one is collecting their results - they are doing their jobs in the background, and once done are ready to be await
ed (in your case by the code inside as_completed
, which you access using loop.run_until_complete()
). I would venture to guess that the URL without parameters takes longer to retrieve than the URL with just the parameter x
, which is why it gets printed after all other coroutines.
In other words, those log lines being printed means that asyncio
is doing its job and providing the parallel execution you requested! If you don't want parallel execution, then don't ask for it, execute them serially:
def get_routes(self, urls):
for url in urls:
yield loop.run_until_complete(self._get_url(url))
But this is a poor way of using asyncio - its main loop is non-reentrant, so to ensure composability, you almost certainly want the loop to be spun just once once at top-level. This is typically done with a construct like loop.run_until_complete(main())
or loop.run_forever()
. As Martijn pointed out, you could achieve that, while retaining the nice generator API, by making get_routes
an actual async generator:
async def get_routes(self, urls):
for url in urls:
result = await self._get_url(url)
yield result
Now you can have a main()
coroutine that looks like this:
async def main():
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
async for response in responses:
# simulate `next` with async iteration
async for other_response in ag.get_routes(['https://httpbin.org/get']):
break
ag.close()
loop.run_until_complete(main())
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.