Issue
I have been trying to wrap my head around async and aiohttp with limited success lately and could use some help please.
I would like to fire off an API requests like clockwork with a small fixed length of time between them, let's just say 0.1s and we'll assume an api rate limit of 10 requests/second, using aiohttp. When the result comes back, I would like to perform a couple of checks on it, which if successful will terminate the function early.
Following some examples I found online, I have build the following script which almost does what I expect, except that the asyncronous http GET requests aren't actually being sent every 0.1s. They seem to be a bit slower, like 0.25s or 0.3s which is about the length that the request takes. This means its not offering any benefit over running it in serial. Could somone please point out where I can change the code to get the desired behaviour? I would also like to optimize it where possible since the GET requests will always be requesting the same endpoint and there might be optimisations to be made there, such as sharing the session object, for example.
Thanks
import time
import datetime
import aiohttp
import asyncio
url = 'http://www.randomnumberapi.com/api/v1.0/random'
async def main():
async with aiohttp.ClientSession() as session:
for i in range(100):
start = time.time()
print(f'{i}: {datetime.datetime.now()}')
await asyncio.sleep(0.1)
print(f'{i}: {time.time() - start}')
async with session.get(url) as response:
answer_list = await response.json()
print(f'{i}: {time.time() - start}')
ans = answer_list[0]
if i == 25:
print('early exit condition met')
break
return(i)
loop = asyncio.get_event_loop()
ans = loop.run_until_complete(main())
print(ans)
which returns...
0: 2021-06-04 14:55:38.917310
0: 0.10055017471313477
0: 1.1943635940551758
1: 2021-06-04 14:55:40.111763
1: 0.10061764717102051
1: 0.4190835952758789
2: 2021-06-04 14:55:40.530876
2: 0.10037660598754883
2: 0.39967823028564453
3: 2021-06-04 14:55:40.930675
3: 0.10052680969238281
3: 0.4090113639831543
4: 2021-06-04 14:55:41.339719
4: 0.10062289237976074
4: 0.4102184772491455
5: 2021-06-04 14:55:41.749971
5: 0.10028338432312012
5: 0.33177995681762695
6: 2021-06-04 14:55:42.081782
6: 0.10028529167175293
6: 0.32780933380126953
7: 2021-06-04 14:55:42.409627
7: 0.10028696060180664
7: 0.3641927242279053
8: 2021-06-04 14:55:42.773969
8: 0.10053634643554688
8: 0.4099152088165283
9: 2021-06-04 14:55:43.183998
9: 0.10070633888244629
9: 0.4089639186859131
10: 2021-06-04 14:55:43.593011
10: 0.10048651695251465
10: 0.3309924602508545
11: 2021-06-04 14:55:43.924210
11: 0.1008145809173584
11: 0.38551807403564453
12: 2021-06-04 14:55:44.309783
12: 0.10041999816894531
12: 0.4093167781829834
13: 2021-06-04 14:55:44.719141
13: 0.10042858123779297
13: 0.409212589263916
14: 2021-06-04 14:55:45.128383
14: 0.10032796859741211
14: 0.5117332935333252
15: 2021-06-04 14:55:45.640148
15: 0.10029864311218262
15: 0.4099447727203369
16: 2021-06-04 14:55:46.050127
16: 0.10030388832092285
16: 0.5113239288330078
17: 2021-06-04 14:55:46.561480
17: 0.10030794143676758
17: 0.5102083683013916
18: 2021-06-04 14:55:47.071729
18: 0.10080623626708984
18: 0.5144610404968262
19: 2021-06-04 14:55:47.586245
19: 0.10080385208129883
19: 0.4096643924713135
20: 2021-06-04 14:55:47.995943
20: 0.10080790519714355
20: 0.5115323066711426
21: 2021-06-04 14:55:48.507522
21: 0.10048961639404297
21: 0.47547459602355957
22: 2021-06-04 14:55:48.983046
22: 0.10050201416015625
22: 0.4206221103668213
23: 2021-06-04 14:55:49.403701
23: 0.10030198097229004
23: 0.5344793796539307
24: 2021-06-04 14:55:49.938217
24: 0.10035061836242676
24: 0.4128684997558594
25: 2021-06-04 14:55:50.351116
25: 0.10033893585205078
25: 0.4100759029388428
early exit condition met
25
Solution
You can use BoundedSempahore to limit the no. of concurrent api requests.
The reason you were not seeing async
behaviour in your code is because you are await
ing on the async function's(session.get
) response inside the for
loop.
In each iteration you wait for the async function to return before going to the next iteration, which is equivalent to calling the urls in sequence.
All the iterations are not started at once as you may have thought.
If you want to run a group of tasks concurrently and manage them as a unit, you can use asyncio.gather
.
import datetime
import aiohttp
import asyncio
MAX_CONCURRENT_API_REQUESTS = 10
URL = 'http://www.randomnumberapi.com/api/v1.0/random'
async def main():
tasks = []
sema = asyncio.BoundedSemaphore(MAX_CONCURRENT_API_REQUESTS)
first_required_random = None
async with aiohttp.ClientSession() as session:
for i in range(100):
await asyncio.sleep(0.1)
tasks.append(asyncio.create_task(async_call_api(sema, session, i)))
try:
await asyncio.gather(*tasks, return_exceptions=False)
except MyRandomNumber as err:
if first_required_random is None:
first_required_random = err.args[0]
for t in tasks:
try:
t.cancel()
except asyncio.CancelledError:
pass
return first_required_random
else:
print('Required condition not satisfied')
async def async_call_api(sema, session, i):
print(f'{i}: Start {datetime.datetime.now()}')
async with sema:
response = await session.get(URL)
answer_list = await response.json()
print(f'{i}: End {datetime.datetime.now()}')
ans = answer_list[0]
print(f'Ans: {ans}')
if ans == 25:
raise MyRandomNumber(ans)
class MyRandomNumber(Exception):
pass
loop = asyncio.new_event_loop()
ans = loop.run_until_complete(main())
By adding await asyncio.sleep(0.1)
just before appending to the task list will ensure the requests are made atleast 0.1s apart.
I updated the code to cancel remaining tasks(url requests) when the required number is found. Unfortunately, this is not 100% reliable since in those milliseconds when the code is busy cancelling the tasks, some of the requests may have already returned the results. Nonetheless, you will always get the first hit, thanks to first_required_random
.
Answered By - Shiva
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.