Issue
I am trying to process many actions asynchronously: I would like to send actions to my loop and run them in an ProcessPoolExecutor
at the same time. I suppose I don't know all the jobs I will be running at the beginning, so I can't define all the jobs and then start the event loop.
The only solution I found is to run a main thread which can process the actions, and another thread to do loop.run_forever
, and this seems to work. However, I didn't see any example of two separate threads running on the same loop in this way. Is there another way to solve this problem, and if what problems could there be with my solution?
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
import threading
executor = ProcessPoolExecutor(max_workers=3)
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop_thread = threading.Thread(target=loop.run_forever)
loop_thread.start()
while True:
msg = recv()
if msg is not None:
do_work(loop, msg)
EDIT:
I get the jobs to receive with the recv
method.
Solution
What you're trying to do is a little bit ambiguous - you say you don't know all the jobs you want to run at the beginning of the program. That's fine - but how do you find out what jobs you want to run? In any case, your test program above can (and should) be re-written to be single-threaded by using BaseEventLoop.call_soon
to schedule all the do_work
calls you want to make before you start the event loop:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
def do_work(eventloop, value):
future = eventloop.run_in_executor(executor, functools.partial(process_action, value))
future.add_done_callback(run_job_success)
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
def run_job_success(f):
print("Success : %s" % f.result())
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
for i in range(5):
loop.call_soon(do_work, loop, i)
loop.run_forever()
Or it could be refactored a little bit further to use coroutines instead of callbacks, which is generally the preferred style when using asyncio
:
import time
import asyncio
import functools
from concurrent.futures import ProcessPoolExecutor
def do_work(loop, value):
return loop.run_in_executor(executor, functools.partial(process_action, value))
def process_action(value):
print("Processing %i" % value)
time.sleep(1)
return value
@asyncio.coroutine
def main(loop):
tasks = [do_work(loop, i) for i in range(5)]
for fut in asyncio.as_completed(tasks):
result = yield from fut
print("Success : %s" % result)
if __name__ == "__main__":
executor = ProcessPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
This also makes it easier to exit the program once all the work is done, rather than having to use Ctrl+C with loop.run_forever
.
Your current approach is safe (loop.run_in_executor
uses loop.call_soon_threadsafe
under the covers, which is the only way you can safely/correctly schedule work in an event loop from a separate thread), it's just overly-complicated and unnecessary; asyncio
is designed so that programs using it are single-threaded (except when blocking operations need to be run, which is what run_in_executor
is there for).
Answered By - dano
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.