Issue
I have a single threaded stream reading loop that needs some concurrency adding to it to improve performance, but I don't understand which is the correct solution to use, in particular because I need to have a "clock" that ticks even if the stream has no data.
In pseudo code:
while True:
wait until either (streamHasData) or (timeNow > lastLoopTime + minStep)
if streamHasData:
processData()
houseKeepingTasks()
lastLoopTime = timeNow()
Currently I have a readStream()
function that blocks until data appears. I could make that async, but then I'd still have to await it somewhere which would make it block again? So I don't see how asyncio helps? With concurrent futures I could run readStream
on a thread, then loop with time.sleep(0.01)
and keep checking if the readStreamFuture has finished, or if it is time to do houseKeepingTasks
. And for extra efficiency I can run houseKeepingTasks as another future.
I feel like asyncio should be able to help here, but I can't see how, since wherever I put the await, it will just block to that point, and I'm back where I started. I feel like I'm missing something fundamental about asyncio.
Solution
You misunderstand how Asyncio works. In particular this is incorrect:
I could make that [function] async, but then I'd still have to await it somewhere which would make it block again
The await
does not mean that the following expression "blocks". All it does is allow for a context switch by the event loop at that point. The surrounding coroutine basically tells the event loop:
"This could take a while. If you have something else to take care of, go do that. But please check back with me some time later."
This means the await
only blocks that coroutine, not the entire thread.
Here is a small demo for you:
from asyncio import (
Event,
TimeoutError,
create_task,
gather,
run,
sleep,
wait_for,
)
async def loop(flag: Event):
while True:
try:
await wait_for(flag.wait(), timeout=3)
except TimeoutError:
print("Took too long...")
else:
print("Something happened!")
flag.clear()
async def flag_setter(flag: Event):
i = 0
while True:
print("Iteration", i)
if i % 5 == 0:
flag.set()
await sleep(1)
i += 1
async def main():
flag = Event()
loop_task = create_task(loop(flag))
setter_task = create_task(flag_setter(flag))
await gather(loop_task, setter_task)
if __name__ == "__main__":
try:
run(main())
except KeyboardInterrupt:
print("\rStopped")
Output:
Iteration 0
Something happened!
Iteration 1
Iteration 2
Took too long...
Iteration 3
Iteration 4
Iteration 5
Something happened!
Iteration 6
Iteration 7
Took too long...
Iteration 8
Iteration 9
Iteration 10
Something happened!
Iteration 11
...
Stopped
Hope this helps and puts you on the right track.
PS
If you want to add a timeout to awaiting "something to happen", you can do that with asyncio.wait_for
. I expanded the example to show how you can use it.
Answered By - Daniil Fajnberg
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.