Issue
The problem I think I am having is related to the event loops and calling cancel from a different event loop than it was created in. The code that I think is causing the issue is the synchronous method that is passed to the client library that connects to an external source, when the external source loses connection it calls this method.
The problem then is that the stop and start methods are both async and on their own work fine the start method is created in a task and it waits for the sleep to end then calls stop which cancels the task and seems to work fine, from the sync method I need to create a new event loop to call stop which also works but while you can call cancel the task never seems to close and I get the desired result of reconnect only after the sleep has finished. If I print the self._task
variable before and after it would show that it has been cancelled but it clearly keeps running.
<Task pending coro=<AsyncAKSServer.start() running at aks_server.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f551e268f18>()]> cb=[<TaskWakeupMethWrapper object at 0x7f551e268e58>()]>
<Task pending coro=<AsyncAKSServer.start() running at aks_server.py:88> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x7f551e268e58>()]>
Unfortunately I can't give the exact code for IP reasons but below is a sample that shows what I am trying to do, I am unable to update the client library that the sync method is sent to currently as we are in out busy period so changing that to async isn't an option. There is a real chance I have misunderstood the docs and how this is supposed to work any help is appreciated.
import asyncio
class AsyncServer:
def __init__(self):
self._task = None
self._duration = 1
async def run(self):
while True:
self._task = asyncio.create_task(self.start())
try:
await self._task
except asyncio.CancelledError:
print('Start task cancelled')
async def start(self):
await asyncio.sleep(self._duration)
await self.stop()
async def stop(self):
self._taks.cancel()
def sync_request(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.stop())
loop.stop()
loop.close()
def main():
server = AsyncServer()
asyncio.run(server.run())
Solution
This problem is indeed caused by having to create a new event loop, the fix for this is to store a reference to the event loop when in the start method. You can then use this to call asyncio.run_coroutine_threadsafe(self.stop(), self._loop)
which passes the same loop in and makes the cancel work as expected.
Answered By - bobthemac
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.