Issue
If I have a coroutine that's consuming items from an async generator what is the "best" way to terminate that loop from an external condition?
Consider this,
while not self.shutdown_event.is_set():
async with self.external_lib_client as client:
async for message in client:
if self.shutdown_event.is_set():
break
await self.handle(message)
If I set shutdown_event
it will break out of the while loop, but not until the next message
has been handled by the async for
loop. What is the correct way to structure the async for
iterator such that it can short circuit if a condition has been met between it yielding results?
Is there a standard way to add a Timeout
?
Solution
One way would be to move the iteration to an async def
and use cancelation:
async def iterate(client):
async for message in client:
# shield() because we want cancelation to cancel retrieval
# of the next message, not ongoing handling of a message
await asyncio.shield(self.handle(message))
async with self.external_lib_client as client:
iter_task = asyncio.create_task(iterate(client))
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
await asyncio.wait([iter_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED)
if iter_task.done():
# iteration has completed, access result to propagate the
# exception if one was raised
iter_task.result()
shutdown_task.cancel()
else:
# shutdown was requested, cancel iteration
iter_task.cancel()
Another way would be to turn shutdown_event
into a one-shot async stream and use aiostream to monitor both. That way the for
loop gets an object when the shutdown event is signaled and can break out of the loop without bothering to finish waiting for the next message:
# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())
async with self.external_lib_client as client, \
aiostream.stream.merge(done_stream, client).stream() as stream:
async for message in stream:
# the merged stream will provide a bogus value (whatever
# `shutdown_event.wait()` returned) when the event is set,
# so check that before using `message`:
if self.shutdown_event.is_set():
break
await self.handle(message)
Note: since the code in the question is not runnable, the above examples are untested.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.