Issue
I have a class which processes a buch of work elements asynchronously (mainly due to overlapping HTTP connection requests) using asyncio
. A very simplified example to demonstrate the structure of my code:
class Work:
...
def worker(self, item):
# do some work on item...
return
def queue(self):
# generate the work items...
yield from range(100)
async def run(self):
with ThreadPoolExecutor(max_workers=10) as executor:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(executor, self.worker, item)
for item in self.queue()
]
for result in await asyncio.gather(*tasks):
pass
work = Work()
asyncio.run(work.run())
In practice, the workers need to access a shared container-like object and call its methods which are not async
-safe. For example, let's say the worker
method calls a function defined like this:
def func(shared_obj, value):
for node in shared_obj.filter(value):
shared_obj.remove(node)
However, calling func
from a worker might affect the other asynchronous workers in this or any other function involving the shared object. I know that I need to use some synchronization, such as a global lock, but I don't find its usage easy:
asyncio.Lock
can be used only inasync
functions, so I would have to mark all such function definitions asasync
- I would also have to
await
all calls of these functions await
is also usable only inasync
functions, so eventually all functions betweenworker
andfunc
would beasync
- if the
worker
wasasync
, it would not be possible to pass it toloop.run_in_executor
(it does notawait
)
Furthermore, some of the functions where I would have to add async
may be generic in the sense that they should be callable from asynchronous as well as "normal" context.
I'm probably missing something serious in the whole concept. With the threading
module, I would just create a lock and work with it in a couple of places, without having to further annotate the functions. Also, there is a nice solution to wrap the shared object such that all access is transparently guarded by a lock. I'm wondering if something similar is possible with asyncio
...
Solution
I'm probably missing something serious in the whole concept. With the threading module, I would just create a lock...
What you are missing is that you're not really using asyncio
at all. run_in_executor
serves to integrate CPU-bound or legacy sync code into an asyncio application. It works by submitting the function it to a ThreadPoolExecutor
and returning an awaitable handle which gets resolved once the function completes. This is "async" in the sense of running in the background, but not in the sense that is central to asyncio. An asyncio program is composed of non-blocking pieces that use async/await to suspend execution when data is unavailable and rely on the event loop to efficiently wait for multiple events at once and resume appropriate async functions.
In other words, as long as you rely on run_in_executor
, you are just using threading
(more precisely concurrent.futures
with a threading executor). You can use a threading.Lock
to synchronize between functions, and things will work exactly as if you used threading
in the first place.
To get the benefits of asyncio such as scaling to a large number of concurrent tasks or reliable cancellation, you should design your program as async (or mostly async) from the ground up. Then you'll be able to modify shared data atomically simply by doing it between two awaits, or use asyncio.Lock
for synchronized modification across awaits.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.