Issue
I have a bunch of online data that I want to download and process efficiently. Downloading already takes some time but cpu-bound processing takes much longer. I struggle to implement a combination of async and ProcessPoolExecutor.
import asyncio
import time
import aiohttp
from aiohttp import ClientSession
from concurrent.futures import ProcessPoolExecutor
class WebData:
def __init__(self, url):
self.url = url
self.binary = b''
async def download(self, client):
time.sleep(0.2)
try:
async with client.get(self.url, timeout=5) as resp:
self.binary = await resp.read()
print(f'Downloaded {self.url}')
except (aiohttp.ClientConnectionError,
asyncio.exceptions.TimeoutError):
pass
return
def process(self):
print(f'Start processing {self.url}')
time.sleep(1)
print(f'Finished processing {self.url}')
async def main():
list_urls = [f'https://www.google.com/search?q={i}'
for i in range(10)]
list_obj = [WebData(url) for url in list_urls]
with ProcessPoolExecutor() as executor:
async with ClientSession() as session:
tasks = [obj.download(session) for obj in list_obj]
await asyncio.gather(*tasks)
list_futures = [
executor.submit(obj.process)
for obj in list_obj]
return list_futures
res = asyncio.run(main())
This works as expected but it fails to accomplish what I am looking for. It first downloads all data and starts processing it only afterwards, which leaves my cores idle during download. Is there any way I can pipe the downloaded objects to the executor while other objects are still downloading?
I found this thread but it isn't what I need.
Solution
You should submit the self.process
inside after the coroutine ends. For that, you can have a separate asynchronous method that will await the download
method and submit the process
to ProcessPoolExecutor
.
class WebData:
def __init__(self, url):
"""The code has not been changed"""
async def download(self, client):
"""The code has not been changed"""
def process(self):
"""The code has not been changed"""
async def execute(self, session, pool):
await self.download(session)
pool.submit(self.process)
async def main():
list_urls = [f'https://www.google.com/search?q={i}' for i in range(10)]
list_obj = [WebData(url) for url in list_urls]
with ProcessPoolExecutor() as pool:
async with ClientSession() as session:
list_futures = await asyncio.gather(*[obj.execute(session, pool) for obj in list_obj])
return list_futures
Answered By - Artyom Vancyan
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.