Issue
I have a block of code which takes a long time to execute and is CPU intense. I want to run that block several times and want to use the full power of my CPU for that. Looking at asyncio
I understood that it is mainly for asynchronous communication, but is also a general tool for asynchronous tasks.
In the following example the time.sleep(y)
is a placeholder for the code I want to run. In this example every co-routine is executed one after the other and the execution takes about 8 seconds.
import asyncio
import logging
import time
async def _do_compute_intense_stuff(x, y, logger):
logger.info('Getting it started...')
for i in range(x):
time.sleep(y)
logger.info('Almost done')
return x * y
logging.basicConfig(format='[%(name)s, %(levelname)s]: %(message)s', level='INFO')
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
co_routines = [
asyncio.ensure_future(_do_compute_intense_stuff(2, 1, logger.getChild(str(i)))) for i in range(4)]
logger.info('Made the co-routines')
responses = loop.run_until_complete(asyncio.gather(*co_routines))
logger.info('Loop is done')
print(responses)
When I replace time.sleep(y)
with asyncio.sleep(y)
it returns nearly immediately. With await asyncio.sleep(y)
it takes about 2 seconds.
Is there a way to parallelize my code using this approach or should I use multiprocessing
or threading
? Would I need to put the time.sleep(y)
into a Thread?
Solution
Executors use multithreading to accomplish this (or mulitprocessing, if you prefer). Asyncio is used to optimize code where you wait frequently for input, output operations to run. Sometimes that can be writing to files or loading websites.
However, with cpu heavy operations (that don't just rely on waiting for IO), it's recommended to use something akin to threads, and, in my opinion, concurrent.futures
provides a very nice wrapper for that and it is similar to Asyncio's wrapper.
The reason why Asyncio.sleep would make your code run faster because it starts the function and then starts checking coroutines to see if they are ready. This doesn't scale well with CPU-heavy operations, as there is no IO to wait for.
To change the following example from multiprocessing to multi-threading Simply change ProcessPoolExecutor
to ThreadPoolExecutor
.
Here is a multiprocessing example:
import concurrent.futures
import time
def a(z):
time.sleep(1)
return z*12
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(a, i) for i in range(5)}
for future in concurrent.futures.as_completed(futures):
data = future.result()
print(data)
This is a simplified version of the example provided in the documentation for executors.
Answered By - Neil
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.