Issue
I am currently trying to understand how asyncio
works in Python. I want to speed up file writing using asynchronous file writing in the aiofiles
library.
I have a synchronous writing function:
def sync_writer():
with open("file.txt", "w") as f:
for i in range(500000):
f.write(f"line#{i}\n")
and its asynchronous alternative that runs multiple asynchronous writers:
import asyncio
import aiofiles
async def write_fun(file, writer_n, n_writers):
for i in range(writer_n, 500000, n_writers):
await file.write(f"line#{i}\n")
async def async_writer(n_writers):
async with aiofiles.open("file.txt", "w") as f:
await asyncio.gather(*[write_fun(f, i, n_writers) for i in range(n_writers)])
Then I run both of this functions and track their completion time. I run the asynchronous writing function with 10 writers:
import time
async def main():
t1 = time.time()
await async_writer(10)
print(time.time()-t1)
t1 = time.time()
sync_writer()
print(time.time()-t1)
asyncio.run(main())
And the final time result is surprising for me because it took 36 seconds for the async function and only 0.33 seconds for sync function to write the file.
I think this is an IO-bound task, so asynchronous write should help here, but is doesn't.
Why is async writing so slow here? How to use asyncio
and all the related libraries in order to actually get an advantage from it?
Solution
The way aiofiles
seems to operate is that each call is dispatched to a threadpool executor which then simply calls the regular blocking file operation. So, every time you execute await f.write(…)
, the following happens:
- The operation is enqueued into the queue of the thread pool
- The thread pool wakes up one of its threads
- The thread calls the blocking function
- The result is communicated back to the event loop
This wouldn't be so bad if it wasn't for these factors:
- You try to write with multiple writers to the same file but writing to a single file with the standard Python API is inherently single-threaded. Multiple threads can't possibly provide a benefit. You'd need something like
os.pwrite
for that - Each write operation is tiny with only a couple of bytes per line, making all overhead significant
- File IO is buffered (typically 4 kiB), so most of the time, the threads do nothing but copy the tiny line into the file buffer. They don't even do anything that could block!
- You don't have any other activity that the event loop could do while waiting for this operation
Contrary to popular believe parallel IO often makes sense, especially with modern SSDs and RAM bandwidth (meaning that parallel access to the page-cache is necessary to exhaust memory bandwidth). But python's API makes this hard to pull off with a single file. Overall, you get much better IO performance with multiple files and IO operations of reasonable size, for example 64 kiB at a time.
Here is a simple proof-of-concept attempt to fix your issues. What we do is buffer multiple write operations in the main thread until a buffer size limit is exceeded and only then do we schedule a write operation
import io
class BufferedWriter:
def __init__(self, asyncfile, bufsize=64*1024):
self.asyncfile = asyncfile
self.bufsize = bufsize
self.buf = io.StringIO()
async def write(self, text):
if op := self.write_async(text):
await op
def write_async(self, text):
buf = self.buf
buf.write(text)
return None if buf.tell() < self.bufsize else self.flush()
def flush(self):
buf = self.buf
rtrn = self.asyncfile.write(buf.getvalue())
buf.seek(0)
buf.truncate()
return rtrn
async def async_writer2():
async with aiofiles.open("file.txt", "w") as f:
buffered = BufferedWriter(f)
for i in range(500000):
await buffered.write(f"line#{i}\n")
await buffered.flush()
On my system this takes 0.18 seconds compared to 11 seconds in your version or 0.07 seconds in the serial version. Not great performance but maybe a reasonable tradeoff to have the IO nonblocking
We can get better with some double-buffering. While a background thread writes to the output file, the main thread can fill the next buffer.
async def async_writer3():
def write_all(outfile):
buffered = BufferedWriter(outfile)
for i in range(500000):
if async_op := buffered.write_async(f"line#{i}\n"):
yield async_op
yield buffered.flush()
async with aiofiles.open("file.txt", "w") as f:
last_write = None
for writeop in write_all(f):
if last_write:
await last_write
last_write = asyncio.create_task(writeop)
if last_write:
await last_write
This gets the runtime down to 0.14 seconds.
Note how we only keep one write operation running as a background task at any time. If we don't, there is a race condition between them and the order of output lines can get mixed. aiofiles
does not ensure that parallel operations on the same file run in order of their scheduling.
Answered By - Homer512
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.