Issue
I recently found and reproduced a memory leak caused by the use of asyncio.wait. Specifically, my program periodically executes some function until stop_event
is set. I simplified my program to the snippet below (with a reduced timeout to demonstrate the issue better):
async def main():
stop_event = asyncio.Event()
while True:
# Do stuff here
await asyncio.wait([stop_event.wait()], timeout=0.0001)
asyncio.run(main())
While this looked innocuous to me, it turns out there's a memory leak here. If you execute the code above, you'll see the memory usage growing to hundreds of MBs in a matter of minutes. This surprised me and took a long time to track down. I was expecting that after the timeout, anything I was waiting for would be cleaned up (since I'm not keeping any references to it myself). However, that turns out not to be the case.
Using gc.get_referrers, I was able to infer that every time I call asyncio.wait(...)
, a new task is created that holds a reference to the object returned by stop_event.wait()
and that task is kept around forever. Specifically, len(asyncio.all_tasks())
keeps increasing over time. Even if the timeout is passed, the tasks are still there. Only upon calling stop_event.set()
do these tasks all finish at once and does memory usage decrease drastically.
After discovering that, this note in the documentation made me try asyncio.wait_for instead:
Unlike wait_for(), wait() does not cancel the futures when a timeout occurs.
It turns out that actually behaves like I expected. There are no references kept after the timeout, and memory usage and number of tasks stay flat. This is the code without a memory leak:
async def main():
stop_event = asyncio.Event()
while True:
# Do stuff here
try:
await asyncio.wait_for(event.stop_event(), timeout=0.0001)
except asyncio.TimeoutError:
pass
asyncio.run(main())
While I'm happy this is fixed now, I don't really understand this behavior. If the timeout has been exceeded, why keep this task holding a reference around? It seems like that's a recipe for creating memory leaks. The note about not cancelling futures is also not clear to me. What if we don't explicitly cancel the future, but we just don't keep a task holding a reference after the timeout? Wouldn't that work as well?
It would be very much appreciated if anybody could shine some light on this. Thanks a lot!
Solution
The key concept to understand here is that the return value of wait()
is a tuple (completed, pending)
tasks.
The typical way to use wait()
-based code is like this:
async def main():
stop_event = asyncio.Event()
pending = [... add things to wait ...]
while pending:
completed, pending = await asyncio.wait(pending, timeout=0.0001)
process(completed) # e.g. update progress bar
pending.extend(more_tasks_to_wait)
wait()
with timeout isn't used to have one coroutine to wait for another coroutines/tasks to finish, instead its primary use case is for periodically flushing completed tasks, while letting the unfinished tasks to continue "in the background", so cancelling the unfinished tasks automatically isn't really desirable, because you usually want to continue waiting for those pending tasks again in the next iteration.
This usage pattern resembles the select()
system call.
On the other hand, the usage pattern of await wait_for(xyz, )
is basically just like doing await xyz
with a timeout. It's a common and much simpler use case.
Answered By - Lie Ryan
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.