Issue
I have a sync generator function that I can't change (since it's in library) that itself accepts a sync iterable.
But I would like to use it from an async context, passing it an async iterable, and without blocking the event loop during iteration. How can I do this?
For example say we have my_gen
that takes an iterable of integers that is designed to work in a sync context.
sync_input_iterable = range(0, 10000)
sync_output_iterable = my_gen(input_iterable)
for v in output_iterable:
print(v)
But I would like to use it like this:
async def main():
# Just for example purposes - there is no "arange" built into Python
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
async_input_iterable = arange(0, 10000)
async_output_iterable = # Something to to with `my_gen`
async for v in async_output_iterable:
print(v)
asyncio.run(main())
what could the # Something...
be to make this work?
Solution
The # Something to to with `my_gen`
could have two components
- A
to_async_iter
to convert the output of the generator function to an async iterable. This works by iterating over it in a thread usingasyncio.to_thread
, which avoids blocking the event loop. - A
to_sync_iter
function to convert the input async iterable to a sync iterable to be passed to the generator function. This works by iterating over it from sync code usingasyncio.run_coroutine_threadsafe
, which makes sense since this is running from a different thread because ofto_async_iter
.
import asyncio
def to_sync_iter(async_iter, loop):
async_it = aiter(async_iter)
while True:
try:
yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
except StopAsyncIteration:
break
async def to_async_iter(sync_iter):
it = iter(sync_iter)
done = object()
def safe_next():
# Converts StopIteration to a sentinal value to avoid:
# TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
try:
return next(it)
except StopIteration:
return done
while True:
value = await asyncio.to_thread(safe_next)
if value is done:
break
yield value
Used for example as follows:
async def main():
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
# For example purposes
def my_gen(iterable):
for a in iterable:
yield a * 2
async_input_iterable = arange(0, 10000)
sync_input_iterable = to_sync_iter(async_input_iterable, asyncio.get_running_loop())
sync_ouput_iterable = my_gen(sync_input_iterable)
async_output_iterable = to_async_iter(sync_ouput_iterable)
async for v in async_output_iterable:
print(v)
asyncio.run(main())
This is inspired by the code at https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 that does something similar to wrap stream-zip to make it work in an async context.
I can't really speak to its performance.
Answered By - Michal Charemza
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.