Issue
I have a synchronous loop that I want to implement asyncio tasks. So far I wrote this POC:
async def consumer(queue):
while True:
revision = await queue.get()
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.1)
print('---- done producing')
# loop.run_until_complete(task) # runs forever
# loop.run_until_complete(asyncio.gather(task)) # runs forever
loop.run_until_complete(queue.join()) # gives: Task was destroyed but it is pending!
main()
The code is executing well, I have the output exactly what I want:
produced 1
produced 2
done working on 1
produced 3
produced 4
done working on 2
produced 5
produced 6
produced 7
done working on 3
produced 8
produced 9
---- done producing
done working on 4
done working on 5
done working on 6
done working on 7
done working on 8
done working on 9
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<consumer() done, defined at .../deleteme.py:8> wait_for=<Future cancelled>>
Except for the last warning (debug error? Python 3.9.1) but not the exception. I know I'm doing something wrong but I can't figure out what exactly. Writing this in multiprocess would solve it but this problem is bugging me almost the whole afternoon.
Solution
Your task
runs consumer
which uses while True
- so it is still working - and this makes problem.
And this is why run_until_complete(task)
runs forever because you runs while True
forever.
You have to stop this task
. You could use queue
to send some value - i.e stop
- which you could check in revision != 'stop'
and use return
to exit while True
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return # <-- exit `task`
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
And you have to send 'stop'
as last value in queue
loop.run_until_complete(queue.put('stop'))
And then all versions work
loop.run_until_complete(task) # now works
loop.run_until_complete(asyncio.gather(task)) # now works
loop.run_until_complete(queue.join()) # now works
Full working code:
import asyncio
import time
import random
async def consumer(queue):
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.5)
print('---- done producing')
print(f'produced stop')
loop.run_until_complete(queue.put('stop'))
loop.run_until_complete(task) # now works
#loop.run_until_complete(asyncio.gather(task)) # now works
#loop.run_until_complete(queue.join()) # now works
main()
Answered By - furas
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.