Issue
I've read a lot of different articles that explain the async
in Python. But they all give an example with asyncio.sleep(x)
, like this one:
import asyncio
async def test1 ():
await asyncio.sleep(1)
print(1)
async def test2 ():
print(2)
async def main ():
await asyncio.gather(test1(), test2())
asyncio.run(main()) #prints 2, then 1
And in this case everything's clear for me: await in function test1 says that during the execution of asyncio.sleep we can do something other, for example execute function test2.
What I don't understand, is that how can the async be useful, if I don't use sleeps in my code? How can I run functions concurrently in that case? For example, how do I run concurrently functions test1 and test2 in the example below?
import asyncio
import time
async def calculate (a):
return a**a
async def test1 ():
x = await calculate(1111111)
print('done!')
async def test2 ():
for i in range(100):
print('.', end='')
async def main ():
await asyncio.gather(test1(), test2())
asyncio.run(main()) #prints 'done!' before the dots
Solution
Asyncio is great when you have code that needs to wait for things. Every await
statement is an opportunity for other code to run, code that no longer has to wait for things.
The easiest, simplest way for documentation to show that something is waiting, is the asyncio.sleep()
function, because that's simple and easy to understand. Moreover, it doesn't require external libraries or network services for it to work.
There are loads of things that might have to wait:
Any network interaction involves loads of waiting; network communication is guaranteed to be slower than your Python code.
Filesystem I/O also has to wait a lot, because even when using SSDs and caches, I/O is slower than CPU cycles. (caveat: there are not many options to exploit this from asyncio yet, because OS support is lacking. Libraries for
asyncio
file I/O tend to fall back to using threads instead).Interprocess communication (e.g. where you'd use the
subprocess
module elsewhere), because the OS, not your code, decides when the other program will run and produce output.Coordination between different tasks, where one thing will have to wait until something else is done. Usually those other things are not done yet because they are waiting for something.
Interacting with humans involves waiting too, because people are way, way slower than computers.
Any use of higher-level constructs such as a database will involve many of the above, so be sure to check if there is an asyncio
-compatible library that'll let you do what you want for those.
In your second example hardly anything waits. The only point where the code has to wait a little, is when you used print()
, because that involves I/O (writing to the terminal), but print()
is not built to work with asyncio
so your code can't do anything else while that happens. print()
is a blocking function, which means that asyncio can't do anything else while blocked. For asyncio to shine, you want to avoid blocking.
I've included a more complex example below, a script that downloads multiple URLs listed in a file given on the command line. It makes use of the aiohttp
library to handle HTTP requests, aiofiles
for file I/O, and tqdm
to display progress bars. It can be configured for how many URLs it'll process in parallel. Disclaimer: I am a contributor to the aiohttp
project, but not a very active one.
#!/usr/bin/env python
import asyncio
import traceback
from contextlib import AsyncExitStack, contextmanager
import aiofiles
import aiohttp
import tqdm
@contextmanager
def put_exceptions(log, *exceptions):
exceptions = exceptions or (Exception,)
try:
yield
except exceptions:
log(f"Exception: {traceback.format_exc()}")
def filename_from_response(response):
disp = response.content_disposition
if disp and disp.filename:
return filename
return response.url.name
async def worker(queue, session, main_progress):
async with AsyncExitStack() as worker_context:
worker_context.enter_context(put_exceptions(print))
while True:
url = await queue.get()
async with AsyncExitStack() as task_context:
task_context.callback(queue.task_done)
task_context.callback(main_progress.update)
async with session.get(url) as response:
filename = filename_from_response(response)
size = int(response.headers.get("content-length", 0)) or None
pbar = tqdm.tqdm(
desc=filename,
leave=False,
total=size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
colour="cyan",
)
task_context.callback(pbar.close)
async with aiofiles.open(filename, "wb") as outf:
while chunk := await response.content.read(8192):
await outf.write(chunk)
pbar.update(len(chunk))
pbar.refresh()
async def main(input_file, worker_count):
with tqdm.tqdm(
total=0, unit="f", colour="green", miniters=1, smoothing=0.1
) as pbar:
queue = asyncio.Queue()
async with aiohttp.ClientSession(raise_for_status=True) as session:
workers = [
asyncio.create_task(worker(queue, session, pbar), name=f"worker{i + 1}")
for i in range(worker_count)
]
async with aiofiles.open(input_file) as f:
async for url in f:
queue.put_nowait(url)
pbar.total += 1
pbar.refresh()
await queue.join()
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Download URLs to disk, given an input file listing the URLS"
)
parser.add_argument("filename")
parser.add_argument(
"-w", dest="worker_count", help="worker count", default=5, type=int
)
args = parser.parse_args()
try:
asyncio.run(main(args.filename, args.worker_count))
except KeyboardInterrupt:
pass
When in use, it looks like this (sample URLs taken from this set of sample satellite images):
The script creates a number of extra tasks, each executing the worker()
coroutine, which uses a shared session object to make a HTTP request for the URL, after which a loop pulls in the response data in chunks to write to disk; this makes it possible to also monitor progress. There are loads of points of waiting, now:
- The input file is read, line by line, in a
async for
loop. This means that if the script has to wait for the OS to produce data from disk, the script can do something else. - Each worker first has to wait for URLs to be made available, via a queue.
- Once they have a URL, the
async with session.get()
call leads to network operations that will have to wait for a remote server to start responding. - For each response that comes back, we have to wait for more data to arrive from the remote server.
- For each chunk of data that arrived, writing to disk will take more waiting still.
- Once all URLs have been read from the input file, the main function has to wait for the workers to empty the queue, and then to finish downloading the URLs they are still processing.
The tqdm progress bars should at least make it easier to see that things are still moving, and that the downloads and disk writes are happening in parallel.
Answered By - Martijn Pieters
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.