Issue
in a general GUI-based script (framework) I have a Tkinter-based GUI. This is run asynchronously (works). When I press the 'Start' button, a processing starts and as it's CPU-heavy, this is done using multiprocessing (works). What doesn't work is the back-reporting of the processes (done this, done that) to the GUI in order to display the progress.
In order the deliver the messages from processes, I use a multiprocessing.Queue. Since the async GUI cannot be fed from this Queue, I use an asyncio.Queue to fed the GUI, and I have the mp_queue_to_async_queue() function to pick out the messages from mp_queue and put them to the async_queue. In theory everything works with the exception that the mp_queue_to_async_queue() row blocks the following print_out_async() row.
Here is a 'dummy' code that depicts the problem:
import multiprocessing
import os
import time
import asyncio
import random
N_PROCESSES = 2
N_ITER = 10
N_SEC = 1
async_queue = asyncio.Queue()
def worker_main(p_queue):
print (_pid:=os.getpid(),"working")
for i in range(N_ITER):
some_random_time = N_SEC * random.random()
p_queue.put(f"{i} - {_pid}: {some_random_time} sec")
time.sleep(some_random_time)
p_queue.put(None)
async def run():
await mp_queue_to_async_queue() #This row is blocking
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
print(f"-> {message}")
await async_queue.put(message)
if message == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
async def print_out_async():
processes_finished = 0
while True:
b = await async_queue.get()
print(f"<- {b}")
if b == None:
processes_finished += 1
if processes_finished == N_PROCESSES:
break
if __name__ == '__main__':
mp_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=N_PROCESSES, initializer=worker_main, initargs=(mp_queue,))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
pool.terminate()
Solution
Your villain is that you are "awaiting" an infinite loop call - it has to be created as a task, and ensure the asyncio loop is given control during each loop execution.
Besides it, mp.queue.get is, by default, blocking - you have to yield to the loop when there are no messages:
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
message = mp_queue.get()
...
Simply check if there is a message ready, otherwise yield the control back to the async loop:
from queue import Empty as QueueEmpty
async def run():
# a created task will run whenever another async code yields
# to the loop.
# Although in this case we can do "fire and forget"
# it is important to keep a reference to each created task:
queues_task = asyncio.create_task(mp_queue_to_async_queue()) #This row is blocking
await print_out_async()
async def mp_queue_to_async_queue():
processes_finished = 0
while True:
try:
message = mp_queue.get_nowait()
except QueueEmpty:
await asyncio.sleep(0)
continue
...
print(f"-> {message}")
...
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.