Issue
I'm doing a bunch of async calls in parallel like so:
txs = await asyncio.gather(*[fetch_tx_details(s["signature"]) for s in sigs])
These calls can sometimes fail and so I'm decorating each with backoff like so:
@retry_with_backoff(10)
async def fetch_tx_details(sig):
# stuff
Where backoff is defined like so:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
time.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
The problem I have is that if any of the calls fail, they are retried sequentially rather than in parallel. Eg I'm trying 1000 calls, 100 fail - I now have 100 sequential calls that are executed (slow) rather than having 100 parallel calls (fast).
How do I change my code to parallelize retries?
Solution
Don't use time.sleep()
. That will completely block execution, including other coroutines. Always use the asyncio.sleep()
coroutine in asyncio tasks as that'll yield execution to other tasks that are not blocked:
sleep()
always suspends the current task, allowing other tasks to run.
You are already using a loop, if you switch to await asyncio.sleep(...)
then the task will pause and let others run until the retry wait time is over:
def retry_with_backoff(retries=5, backoff_in_ms=100):
def wrapper(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
x = 0
while True:
try:
return await f(*args, **kwargs)
except Exception as e:
print('Fetch error:', e)
if x == retries:
raise
else:
sleep_ms = (backoff_in_ms * 2 ** x +
random.uniform(0, 1))
await asyncio.sleep(sleep_ms / 1000)
x += 1
print(f'Retrying {x + 1}/{retries}')
return wrapped
return wrapper
Anywhere await
is used in a coroutine control can be passed back to the event loop to switch tasks. The connected await
calls are the thread from event loop to the current point of execution in asyncio tasks, and the means by which a coroutine can co-operate to allow other tasks to do work every time there is reason to wait for something. Like waiting for enough time to have passed.
time.sleep()
, on the other hand, waits without handing control over to other tasks. Instead, the whole thread is blocked from doing anything, and here that means the asyncio event loop is blocked, and so the other tasks can't execute either.
Answered By - Martijn Pieters
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.