Issue
I want to use threading.Lock()
in a async function, asyncio.Lock()
is not thread safe so I cannot do with await asyncio.Lock():
. The reason I need to use threading.Lock()
is because this object may be accessed by multiple treads, hence it is used in a web app and the server running it can spin up many threads. What is an effective way of doing so ? So far I've tried a simple function that uses the lock:
1)
async def main():
with await threading.Lock():
a = 6
return a
TypeError: object _thread.lock can't be used in 'await' expression
async def main():
async with threading.Lock():
a = 1564666546
return a
AttributeError: __aexit__
Solution
You can't pass a threading.Lock
to async with
because it is not designed for async usage, it's a blocking primitive. More importantly, async with threading.Lock()
doesn't make sense even if it did work because you would be acquiring a brand new lock, which would always succeed. For locking to make sense, you must have a lock shared between multiple threads, e.g. stored in an attribute of the object, or associated with an object in another way. The rest of this answer will assume that you have a threading.Lock
shared between threads.
Since threading.Lock
always blocks, the only way you can use it from asyncio is to acquire it in a dedicated thread, suspending the execution of the current coroutine until the lock is acquired. This functionality is already covered by the run_in_executor
event loop method, which you can apply:
_pool = concurrent.futures.ThreadPoolExecutor()
async def work(lock, other_args...):
# lock is a threading.Lock shared between threads
loop = asyncio.get_event_loop()
# Acquire the lock in a worker thread, suspending us while waiting.
await loop.run_in_executor(_pool, lock.acquire)
... access the object with the lock held ...
# Can release directly because release() doesn't block and a
# threading.Lock can be released from any thread.
lock.release()
You can make this more elegant to use (and exception-safe) by creating an async context manager:
_pool = concurrent.futures.ThreadPoolExecutor()
@contextlib.asynccontextmanager
async def async_lock(lock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(_pool, lock.acquire)
try:
yield # the lock is held
finally:
lock.release()
Then you can use it as follows:
# lock is a threading.Lock shared between threads
async with async_lock(lock):
... access the object with the lock held ...
Of course, outside asyncio you wouldn't use any of that, you'd just acquire the lock directly:
# lock is a threading.Lock shared between threads
with lock:
... access the object ...
Note that we use a separate thread pool instead of passing None
to run_in_executor()
to reuse the default pool. This is to avoid deadlock in situations where the function that holds the lock itself needs access to the thread pool for other uses of run_in_executor()
. By keeping the thread pool private, we avoid the possibility of deadlocking through the use of the same pool by others.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.