Issue
Im studying py asyncio and im having trouble to create an endless non-blocking looping.
I noticed that my tasks stuck pending because im not awaiting the tasks but if I await, the endless loop will be blocked.
How can i manage this?
import random
import asyncio
must_stop = False
background_tasks = set()
async def process():
global must_stop
await asyncio.sleep(5)
result = random.randint(10, 20)
if result == 15:
must_stop = True
async def get_msg():
await asyncio.sleep(3)
async def main():
global must_stop
global background_tasks
while not must_stop:
await get_msg()
task = asyncio.create_task(process())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
asyncio.run(main())
EDIT: After some answers, i noticed that my example was wrong. What i can have to change for the following code work?
Non async messege fun
import random
import asyncio
must_stop = False
background_tasks = set()
async def process(msg):
global must_stop
await asyncio.sleep(5)
if msg == 15:
must_stop = True
def get_msg():
return random.randint(10, 20)
async def main():
global must_stop
global background_tasks
while not must_stop:
msg = get_msg()
task = asyncio.create_task(process(msg))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
asyncio.run(main())
Im trying to create an endless non-blocking loop to emulate a consumer flow but process tasks arent being processed.
Solution
I think your code mostly does what you want: your process
task runs concurrently with your loop in main
; waiting for a new msg to arrive does not prevent process
from running.
I think your only problem is that your code may exit prematurely -- that is, if process
sets must_stop
, then your program may exit while there are still pending tasks. You can fix that by calling asyncio.gather
after your loop exits, like this:
import random
import asyncio
must_stop = False
background_tasks = set()
async def process(msg):
global must_stop
print(f'start processing {msg}')
await asyncio.sleep(5)
print(f'stop processing {msg}')
if msg == 15:
must_stop = True
async def get_msg():
await asyncio.sleep(3)
return random.randint(10, 20)
async def main():
global must_stop
global background_tasks
while not must_stop:
msg = await get_msg()
task = asyncio.create_task(process(msg))
background_tasks.add(task)
print('collecting remaining tasks')
await asyncio.gather(*background_tasks)
asyncio.run(main())
Running this will produce something like:
start processing 12
start processing 20
stop processing 12
start processing 15
stop processing 20
start processing 18
stop processing 15
collecting remaining tasks
start processing 10
stop processing 18
stop processing 10
Here you can see there were two tasks still executing after must_stop
caused the loop to exit.
Update
It is possible to use asyncio to run a synchronous function in a background thread; see the run_in_executor
method. That might look like:
import random
import asyncio
import time
must_stop = False
background_tasks = set()
async def process(msg):
global must_stop
print(f'start processing {msg}')
await asyncio.sleep(5)
print(f'stop processing {msg}')
if msg == 15:
must_stop = True
def get_msg():
time.sleep(3)
return random.randint(10, 20)
async def main():
global must_stop
global background_tasks
loop = asyncio.get_running_loop()
while not must_stop:
msg = await loop.run_in_executor(None, get_msg)
task = asyncio.create_task(process(msg))
background_tasks.add(task)
print('collecting remaining tasks')
await asyncio.gather(*background_tasks)
asyncio.run(main())
The behavior is identical to the previous example.
Answered By - larsks
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.