Issue
I am prototyping a FastAPI app with an endpoint that will launch long-running process using subprocess module. The obvious solution is to use concurrent.futures and ProcessPoolExecutor, however I am unable to get the behavior I want. Code sample:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random
pool = ProcessPoolExecutor(5)
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
sb.check_output(["touch", str(s)])
print("done")
async def async_task():
loop = asyncio.get_event_loop()
print("started")
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
while True:
print("in async task")
done, _ = await asyncio.wait(tasks, timeout=1)
for task in done:
await task
await asyncio.sleep(1)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(async_task())
if __name__ == "__main__":
main()
This sample works fine, on the surface, but spawned processes do not get stopped after execution completes - I see all of python processes in ps aux | grep python
. Shouldn't awaiting completed task stop it? In the end I do not care much about the result of the execution, it just should happen in the background and exit cleanly - without any hanging processes.
Solution
You must close the ProcessPool when you are done using it, either by explicitly calling its shutdown()
method, or using it in a ContextManager. I used the ContextManager approach.
I don't know what subprocess.check_output does, so I commented it out.
I also replaced your infinite loop with a single call to asyncio.gather, which will yield until the Executor is finished.
I'm on Windows, so to observe the creation/deletion of Processes I watched the Windows Task Manager. The program creates 5 subprocesses and closes them again when the ProcessPool context manager exits.
import asyncio
from concurrent.futures import ProcessPoolExecutor
# import subprocess as sb
import time
import random
def long_task(s):
print("started")
time.sleep(random.randrange(5, 15))
# sb.check_output(["touch", str(s)])
print("done", s)
async def async_task():
loop = asyncio.get_event_loop()
print("started")
with ProcessPoolExecutor(5) as pool:
tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
await asyncio.gather(*tasks)
print("Completely done")
def main():
asyncio.run(async_task())
if __name__ == "__main__":
main()
Answered By - Paul Cornelius
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.