Issue
I am searching for huge number of addresses on web, I want to use both asyncio and ProcessPoolExecutor in my task to quickly search the addresses.
async def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
futures_list = []
for _address in _addresses:
futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]
for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
results = await f
asyncio.get_event_loop().run_until_complete(main())
expected:
I want to execute_parallel
function should run in parallel.
error:
Traceback (most recent call last):
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
results = await f
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
return f.result() # May raise f.exception().
TypeError: can't pickle coroutine objects
Solution
I'm not sure I'm answering the correct question, but it appears the intent of your code is to run your execute_parallel function across several processes using Asyncio. As opposed to using ProcessPoolExecutor, why not try something like using a normal multiprocessing Pool and setting up separate Asyncio loops to run in each. You might set up one process per core and let Asyncio work its magic within each process.
async def run_loop(addresses):
loop = asyncio.get_event_loop()
loops = [loop.create_task(execute_parallel, address) for address in addresses]
loop.run_until_complete(asyncio.wait(loops))
def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with multiprocessing.Pool(processes=n_jobs) as pool:
pool.imap_unordered(run_loop, _addresses)
I've used Pool.imap_unordered with great success, but depending on your needs you may prefer Pool.map or some other functionality. You can play around with chunksize or with the number of addresses in each list to achieve optimal results (ie, if you're getting a lot of timeouts you may want to reduce the number of addresses being processed concurrently)
Answered By - Jason Shaffner
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.