Issue
I want to use both ThreadPoolExecutor
from concurrent.futures
and async functions.
My program repeatedly submits a function with different input values to a thread pool. The final sequence of tasks that are executed in that larger function can be in any order, and I don't care about the return value, just that they execute at some point in the future.
So I tried to do this
async def startLoop():
while 1:
for item in clients:
arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))
wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
where the function submitted is:
async def threadWork(obj):
bool = do_something() # needs to execute before next functions
if bool:
do_a() # can be executed at any time
do_b() # ^
where do_b
and do_a
are async functions.The problem with this is that I get the error: TypeError: object Future can't be used in 'await' expression
and if I remove the await, I get another error saying I need to add await
.
I guess I could make everything use threads, but I don't really want to do that.
Solution
I recommend a careful readthrough of Python 3's asyncio development guide, particularly the "Concurrency and Multithreading" section.
The main conceptual issue in your example that event loops are single-threaded, so it doesn't make sense to execute an async coroutine in a thread pool. There are a few ways for event loops and threads to interact:
Event loop per thread. For example:
async def threadWorkAsync(obj): b = do_something() if b: # Run a and b as concurrent tasks task_a = asyncio.create_task(do_a()) task_b = asyncio.create_task(do_b()) await task_a await task_b def threadWork(obj): # Create run loop for this thread and block until completion asyncio.run(threadWorkAsync()) def startLoop(): while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(config.threadPool.submit(threadWork, item)) wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
Execute blocking code in an executor. This allows you to use async futures instead of concurrent futures as above.
async def startLoop(): while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(asyncio.run_in_executor( config.threadPool, threadWork, item)) await asyncio.gather(*arrayOfFutures)
Use threadsafe functions to submit tasks to event loops across threads. For example, instead of creating a run loop for each thread you could run all async coroutines in the main thread's run loop:
def threadWork(obj, loop): b = do_something() if b: future_a = asyncio.run_coroutine_threadsafe(do_a(), loop) future_b = asyncio.run_coroutine_threadsafe(do_b(), loop) concurrent.futures.wait([future_a, future_b]) async def startLoop(): loop = asyncio.get_running_loop() while 1: arrayOfFutures = [] for item in clients: arrayOfFutures.append(asyncio.run_in_executor( config.threadPool, threadWork, item, loop)) await asyncio.gather(*arrayOfFutures)
Note: This example should not be used literally as it will result in all coroutines executing in the main thread while the thread pool workers just block. This is just to show an example of the
run_coroutine_threadsafe()
method.
Answered By - augurar
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.