Issue
I'd like to download a series of large (~200MB) files, and use the time while they're downloading to do some CPU intensive processing. I'm investigating asyncio and aiohttp. My understanding is I can use them to start a large download and then do some heavy computation on the same thread while the download continues in the background.
What I am finding, however, is that the download is paused while the heavy CPU process continues, then resumes as soon as the calculation is done. I include a minimal example below. I visually monitor the process CPU and bandwidth while the script is running. It's clear the download pauses during the ~30s of computation. Am I doing something wrong? Or am I not understanding what aiohttp can do?
import asyncio
import time
import aiofiles
import aiohttp
async def download(session):
url = 'https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-s390x.sh' # 280 MB file
async with session.get(url) as resp:
async with aiofiles.open('./tmpfile', mode='wb') as f:
print('Starting the download')
data = await resp.read()
print('Starting the file write')
await f.write(data)
print('Download completed')
async def heavy_cpu_load():
await asyncio.sleep(5) # make sure the download has started
print('Starting the computation')
for i in range(200000000): # takes about 30 seconds on my laptop.
i ** 0.5
print('Finished the computation')
async def main():
async with aiohttp.ClientSession() as session:
timer = time.time()
tasks = [download(session), heavy_cpu_load()]
await asyncio.gather(*tasks)
print(f'All tasks completed in {time.time() - timer}s')
if __name__ == '__main__':
asyncio.run(main())
Solution
if you have a calculation that doesn't need to await
something, then you can (and should) run it in a separate thread using loop.run_in_executor, so it will run in the background, threading is what enables the computer to run tasks concurrently (although not necessarily completely simultaneous)
import asyncio
import time
import aiofiles
import aiohttp
async def download(session):
url = 'https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-s390x.sh' # 280 MB file
async with session.get(url) as resp:
async with aiofiles.open('./tmpfile', mode='wb') as f:
print('Starting the download')
data = await resp.read()
print('Starting the file write')
await f.write(data)
print('Download completed')
# not async
def heavy_cpu_load():
print('Starting the computation')
for i in range(200000000): # takes about 30 seconds on my laptop.
i ** 0.5
print('Finished the computation')
async def main():
async with aiohttp.ClientSession() as session:
timer = time.time()
tasks = [
download(session),
asyncio.get_running_loop().run_in_executor(None, func=heavy_cpu_load)
]
await asyncio.gather(*tasks)
print(f'All tasks completed in {time.time() - timer}s')
asyncio.run(main())
using numpy
(which drop the GIL) and aiohttp
(network also drops the GIL) should allow your calculations and download to run almost in parallel using only threads, an alternative is to use ProcessPoolExecutor
as outlined in the linked documentation to allow any function to run in parallel that doesn't drop the gil (with a lot of extra limitations).
Answered By - Ahmed AEK
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.