Issue
Background:
I'm using asyncio to make API requests asynchronously because it takes a long time to receive response body but my subscription allows me to request at a faster rate. Response.headers has an attribute 'time_reset' that gives me the earliest time at which a new request would succeed. New calls made prior to that time would result in status code 429 (exceeding rate limit)
Goals:
- Asynchrounously make new requests without having to wait until receiving results from the past ones
- Set varying time delays for each request, so that new calls are only made after current time's past 'time_reset' (which is updated with each call)
Problem:
I've managed to achieve goal 1 (if manually setting the delays to a fixed number between requests), but failed at goal 2. What I'm doing now is
- Set a global variable
time_reset
whose value can be obtained/modified instantly when a new call is made - Create tasks to retrieve the data in a loop. Between creation of tasks I await some time based on the current value of
time_reset
- Wrap the tasks in a list to gather in the end.
However, with the code below, it seems no delay time is awaited between requests. I don't know why this happens nor where I'm doing wrong. How should I modify the code (or write something new) to achieve the goals above?
Code:
import asyncio
import aiohttp
import time
async def fetch(url, headers):
global time_reset
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
time_reset = int(response.headers.get('time_reset')) # time when next request'd be successful/not exceeding rate limit
if response.status_code==200:
data = await response.json()
else:
data = None
return data
async def main():
global time_reset
time_reset = 0
request_urls = <a list of urls to call>
headers = <headers with renewed token>
tasks = []
for url in request_urls:
task = asyncio.create_task(fetch(url, headers))
tasks.append(task)
current_time = int(time.time())
delay = max(0, time_reset - current_time + 0.5) # +0.5 to prevent lags
await asyncio.sleep(delay)
datas = await asyncio.gather(*tasks)
await main() # running in jupyter notebook
Related Post:
Python Aiohttp Asyncio: how to create delays between each task Here, top answers set delays to be fixed, instead of self-updating.
Any advice'd be appreciated.
Solution
If I understand the implementation of aiohttp, the following things are true:
- As soon as the expression
await request.get()
is evaluated, the response headers are available. I see that response.headers is not an awaitable. - The response body may possibly take a significant amount of time to access. That's the problem you are trying to solve.
- The header 'time_reset' indicates the earliest moment at which the server will accept the next
request.get
call. Multiple request.json calls can go in parallel while you are waiting to callrequest.get
.
If that's all correct, then you have to separate the logic of checking the request headers from the logic of downloading the request body. That's because you need to inspect the headers before you know how long to wait before making the next request.get
call. What you want to parallelize is only the downloading of the request body.
You must issue the request.get
calls in a serial fashion, one after the other, with the proper time delay between them. Since you have multiple Tasks running, you can use an asyncio.Lock
object to achieve this.
The main problem with your implementation is that you have attempted to put the time delay in the wrong place. Instead of between Task creations, it needs to be between request.get
calls.
Here is a possible implementation.
I have created a small class to act as the "gate" for serializing the calls to request.get
. That eliminates the need for the global variable. The function Fetcher.get
returns a response object, which your Tasks can then use to download the data.
Unfortunately time_reset
is not a standard header, so I cannot test this code myself. I'm fairly certain there is a big problem with your time logic: the standard Python function time.time() returns the floating point number of seconds since 1/1/1970 (I think), while the header presumably returns a time stamp in some sort of internet format (a formatted string). I just ignored this problem because I don't have enough information to do anything else. Also I don't understand why you are trying to convert all the time values to integers. That would lead to sleep times that contain round-off errors; instead, floating point should just work.
import asyncio
import time
class Fetcher:
def __init__(self):
self.next_time = 0
self.lock = asyncio.Lock()
async def get(self, session, url, headers): # returns a response object
with self.lock:
t = time.time()
if t < self.next_time:
await asyncio.sleep(self.next_time - t)
response = await session.get(url, headers=headers)
self.next_time = response.headers.get('time_reset')
return response
async def fetch(fetcher, url, headers):
async with aiohttp.ClientSession() as session:
response = await fetcher.get(session, url, headers)
async with response:
if response.status_code == 200:
data = await response.json()
else:
data = None
return data
async def main():
request_urls = "<a list of urls to call>"
headers = "<headers with renewed token>"
tasks = []
fetcher = Fetcher()
for url in request_urls:
task = asyncio.create_task(fetch(fetcher, url, headers))
tasks.append(task)
datas = await asyncio.gather(*tasks)
await main() # running in jupyter notebook
Answered By - Paul Cornelius
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.