Issue
With the following snippet, I can't figure why the infiniteTask
is not cancelled (it keeps spamming "I'm still standing")
In debug mode, I can see that the Task
stored in unfinished
is indeed marked as Cancelled but obiously the thread is not cancelled / killed.
Why is the thread not killed when the wrapping task is cancelled ? What should I do to stop the thread ?
import time
import asyncio
def quickTask():
time.sleep(1)
def infiniteTask():
while True:
time.sleep(1)
print("I'm still standing")
async def main():
finished, unfinished = await asyncio.wait({
asyncio.create_task(asyncio.to_thread(quickTask)),
asyncio.create_task(asyncio.to_thread(infiniteTask))
},
return_when = "FIRST_COMPLETED"
)
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
print(" finished : " + str(len(finished))) # print '1'
print("unfinished : " + str(len(unfinished))) # print '1'
asyncio.run(main())
Solution
Cause
If we check the definition of asyncio.to_thread()
:
# python310/Lib/asyncio/threads.py
# ...
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
It's actually a wrapper of loop.run_in_executor
.
If we then go into how asyncio's test handle run_in_executor
:
# python310/Lib/test/test_asyncio/threads.py
# ...
class EventLoopTestsMixin:
# ...
def test_run_in_executor_cancel(self):
called = False
def patched_call_soon(*args):
nonlocal called
called = True
def run():
time.sleep(0.05)
f2 = self.loop.run_in_executor(None, run)
f2.cancel()
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
self.loop.call_soon = patched_call_soon
self.loop.call_soon_threadsafe = patched_call_soon
time.sleep(0.4)
self.assertFalse(called)
You can see it will wait for self.loop.shutdown_default_executor()
.
Now let's see how it looks like.
# event.pyi
# ...
class BaseEventLoop(events.AbstractEventLoop):
# ...
async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
self._executor_shutdown_called = True
if self._default_executor is None:
return
future = self.create_future()
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
await future
finally:
thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
self.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
self.call_soon_threadsafe(future.set_exception, ex)
Here, we can see it creates another thread to wait for _do_shutdown
, which then runs self._default_executor.shutdown
with wait=True
parameter.
Then where the shutdown is implemented:
# Python310/Lib/concurrent/futures/thread.py
# ...
class ThreadPoolExecutor(_base.Executor):
# ...
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
When wait=True
it decides to wait for all thread to be gracefully stops.
From all these we can't see any effort to actually cancel a thread.
To quote from Trio Documentation:
Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:
- If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.
- If cancellable=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises.
So, from these we can learn that there's no way to terminate infinite-loop running in thread.
Workaround
Since now we know we have to design what's going to run in thread with a bit more care, we need a way to signal the thread that we want to stop.
We can utilize Event
for such cases.
import time
import asyncio
def blocking_func(event: asyncio.Event):
while not event.is_set():
time.sleep(1)
print("I'm still standing")
async def main():
event = asyncio.Event()
asyncio.create_task(asyncio.to_thread(blocking_func, event))
await asyncio.sleep(5)
# now lets stop
event.set()
asyncio.run(main())
By checking event on every loop we can see program terminating gracefully.
I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing
Process finished with exit code 0
Answered By - jupiterbjy
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.