Issue
If I have a slow consumer of an async generator that emits values at a quick frequency, and I only care about consuming the latest value (i.e. I'm not fussed about dropping values), is there a way to achieve this in an eloquent way? I've taken a look at aiostream but I couldn't seem to find anything that fits.
Here is a simple example:
import asyncio
import aiostream
async def main():
xs = aiostream.stream.count(interval=0.2)
async with xs.stream() as stream:
async for x in stream: # do something here to drop updates that aren't processed in time
print(x)
await asyncio.sleep(1.0)
if __name__ == "__main__":
asyncio.run(main())
Solution
I propose you to use a class that handles the external generator, since I don't know any source to do that.
The class can consume internally the generator in a task and keep only the last value. It's going to be like a wrapper over the generator you really want to consume.
import asyncio
class RelaxedGenerator:
def __init__(self, async_gen):
self.last_value = None # the last value generated
self.consumed_last = True # flags the last value as consumed
self.async_gen = async_gen # generator which we can drop values
self.exhausted = False # flags the generator as fully consumed
@classmethod
async def start(cls, async_gen):
self = cls(async_gen())
asyncio.create_task(self.generate())
return self
async def generate(self):
# here you can consume the external async generator
# and save only the last value for further process
while True:
try:
self.last_value = await self.async_gen.__anext__()
self.consumed_last = False
except StopAsyncIteration:
self.exhausted = True
break
async def stream(self):
while not self.exhausted:
if self.consumed_last:
await asyncio.sleep(0.01) # avoids block the loop
continue
self.consumed_last = True
yield self.last_value
Testing with a simple generator:
import asyncio
from random import uniform
async def numbers_stream(max_=100):
next_int = -1
while next_int < max_:
next_int += 1
yield next_int
await asyncio.sleep(0.2)
async def main():
gen = await RelaxedGenerator.start(numbers_stream)
async for value in gen.stream():
print(value, end=", ", flush=True)
await asyncio.sleep(uniform(1, 2))
asyncio.run(main())
Output:
0, 6, 15, 21, 28, 38, 43, 48, 57, 65, 73, 81, 89, 96,
Other things to keep in mind is if you want to process the last value or if the generator you are working with is going to be exhausted or not in practice. Here I assume that you don't care about last value and the generator can be exhausted.
Answered By - svex99
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.