Issue
How can I implement the functionality of asyncio as_completed
on anyio?
I have a messagebus that picks up a user command and directs it to the appropriate handler. That may generate a new domain event that should be picked up by the bus too. Using asyncio.as_completed
I can run multiple tasks concurrently, get the results of each task as they complete, check if a new event was generated, and then handle this new event. I would like to use anyio, but don't know how.
This is kinda of what I am doing with asyncio:
import asyncio
import itertools
import anyio
async def handle(event: str):
await handle_event(event)
async def handle_event(event: str):
if event == "event":
coros = [slow_2(), slow_5()]
else:
coros = [slow_1(), slow_1()]
for coro in asyncio.as_completed(coros):
result = await coro
new_events = []
if result == "event":
new_events.append(["", ""])
if new_events:
async with anyio.create_task_group() as tg:
for event in new_events:
tg.start_soon(handle_event, event)
async def spin(msg: str) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, flush=True, end="")
try:
await anyio.sleep(0.1)
except Exception:
break
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")
async def slow_1():
await anyio.sleep(1)
print("slow_1")
async def slow_2():
await anyio.sleep(2)
print("slow_2")
return "event"
async def slow_5():
await anyio.sleep(5)
print("slow_5")
async def supervisor():
async with anyio.create_task_group() as tg:
with anyio.CancelScope(shield=True) as scope:
tg.start_soon(spin, "thinking!")
await handle("event")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(supervisor)
Solution
There are a few ways you could do this, but here's one that should have almost the same API:
from collections.abc import Awaitable, Iterable
from typing import TypeVar
import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup
T = TypeVar("T")
def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> Iterable[Awaitable[T]]:
send_stream, receive_stream = create_memory_object_stream()
async def populate_result(a: Awaitable[T]):
await send_stream.send(await a)
async def wait_for_result() -> T:
return await receive_stream.receive()
for a in aws:
tg.start_soon(populate_result, a)
return (wait_for_result() for _ in aws)
async def main():
async with anyio.create_task_group() as tg:
coroutines = [slow_1(), slow_2(), slow_3()]
for coroutine in as_completed(tg, coroutines):
result = await coroutine
# do stuff with result
anyio.run(main)
If you don't mind changing the API slightly, we can simplify a bit more:
from collections.abc import Awaitable, Iterable, AsyncIterable
from typing import TypeVar
import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup
T = TypeVar("T")
def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> AsyncIterable[Awaitable[T]]:
send_stream, receive_stream = create_memory_object_stream()
async def populate_result(a: Awaitable[T]):
await send_stream.send(await a)
for a in aws:
tg.start_soon(populate_result, a)
return receive_stream
async def main():
async with anyio.create_task_group() as tg:
coroutines = [slow_1(), slow_2(), slow_3()]
async for result in as_completed(tg, coroutines):
# do stuff with result
anyio.run(main)
I chose to use a TypeVar
named T
everywhere, but you could consider using Any
instead. This would mean you can use this with mixed coroutine types.
DISCLAIMER: I haven't actually run this code, but the approach should work just fine with minor modifications if necessary.
Answered By - Mezuzza
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.