Issue
I have a few coroutines running on one process (A) and one heavier unbounded job running on a separate process(B). I would like that heavier job to dispatch its results into a queue which is consumed by the original process (A).
Similar to this:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def process__heavy(pipe):
print("[B] starting...")
while True:
print(f"[B] Pipe queue: {pipe.qsize()}")
pipe.put_nowait(str(time.time()))
time.sleep(0.5)
async def coroutine__stats(pipe):
print("[A] starting...")
while True:
print(f"[A] Pipe queue: {pipe.qsize()}")
await asyncio.sleep(1)
async def main():
pipe = asyncio.Queue()
executor = ProcessPoolExecutor()
jobs = await asyncio.gather(
asyncio.get_running_loop().run_in_executor(executor, process__heavy, pipe),
coroutine__stats(pipe)
)
print(f"Finished with result: {jobs.result()}")
if __name__ == '__main__':
asyncio.run(main())
print("Bye.")
Outut
[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 0 <--- why zero?
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 0 <---
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 0 <---
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 0
[B] Pipe queue: 8
The original process (A) does not see any data put into the shared queue. I do not remember if in python you can do object sharing across processes or if is it all pickled and the only result you can get is when the process exits and returns?
What am I doing wrong and what would be the best way to create a data pipe between those 2 processes?
Solution
Use a multiprocessing.Manager()
to create the Queue
instead of asyncio.Queue
:
import multiprocessing as mp
# ...
pipe = mp.Manager().Queue()
With that change to the OP code:
[A] starting...
[A] Pipe queue: 0
[B] starting...
[B] Pipe queue: 0
[B] Pipe queue: 1
[A] Pipe queue: 2
[B] Pipe queue: 2
[B] Pipe queue: 3
[A] Pipe queue: 4
[B] Pipe queue: 4
[B] Pipe queue: 5
[A] Pipe queue: 6
[B] Pipe queue: 6
[B] Pipe queue: 7
[A] Pipe queue: 8
[B] Pipe queue: 8
Answered By - Mark Tolonen
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.