Issue
My server exposes an API for a resource-intensive rendering work. The job it does involves a GPU and as such the server can handle only a single request at a time. Client should submit a job and receive 201
- ACCEPTED - as a response immediately after. The processing can take up to a minute and there can be a few dozens of requests scheduled.
Here's what I came up with, boiled to a minimal reproducible example:
import time
import asyncio
from fastapi import FastAPI, status
app = FastAPI()
fifo_queue = asyncio.Queue()
async def process_requests():
while True:
name = await fifo_queue.get() # Wait for a request from the queue
print(name)
time.sleep(10) # A RESOURCE INTENSIVE JOB THAT BLOCKS THE THREAD
fifo_queue.task_done() # Indicate that the request has been processed
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_requests()) # Start the request processing task
@app.get("/render")
async def render(name):
fifo_queue.put_nowait(name) # Add the request parameter to the queue
return status.HTTP_201_CREATED # Return a 201 status code
The problem with this approach is that the server does not stay responsive. After sending the first request it gets busy full time with it and does not respond as I have hoped.
curl http://127.0.0.1:8000/render\?name\=001
In this example simply replacing time.sleep(10)
with await asyncio.sleep(10)
solves the problem, but not in the real use case (though possibly offers a clue as for what I am doing incorrectly).
Any ideas?
Solution
As you may have figured out already, the main issue in your example is that you run a synchronous blocking operation within an async def
endpoint, which blocks the event loop, and hence, the entire server. As explained in this answer (please have a look at it for more details), if one has to use an async def
endpoint, they could run such CPU-bound tasks in an external ProcessPool
and then await
it (using asyncio
's loop.run_in_executor()
, see the linked answer above for more details), which would return control back to the event loop, until that task is complete. As explained in the linked answer, when using ProcessPoolExecutor
on Windows, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__'
(as shown in the example below).
I would also suggest using a lifespan
handler, as demonstrated in this answer and this answer, instead of the deprecated startup
and shutdown
event handlers, to start the process_requests
function, as well as instantiate the asyncio.Queue()
and the ProcessPoolExecutor
, and then add them to the request.state
, so that they can be shared by every request/endpoint, instead of creating new objects every time.
Further, I would suggest creating a unique ID for every request arrived, and return that ID to the client, so that they can use it to check on the status of their request, i.e., whether is complete or still pending processing. You could save that ID to your database storage (or a Key-Value store, such as Redis), as explained in this answer; however, for simplicity and demo purposes, the example belows uses a dict
object for that purpose.
Working Example
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from dataclasses import dataclass
from concurrent.futures import ProcessPoolExecutor
import time
import asyncio
import uuid
@dataclass
class Item:
id: str
name: str
# Computationally Intensive Task
def cpu_bound_task(item: Item):
print(f"Processing: {item.name}")
time.sleep(15)
return 'ok'
async def process_requests(q: asyncio.Queue, pool: ProcessPoolExecutor):
while True:
item = await q.get() # Get a request from the queue
loop = asyncio.get_running_loop()
fake_db[item.id] = 'Processing...'
r = await loop.run_in_executor(pool, cpu_bound_task, item)
q.task_done() # tell the queue that the processing on the task is complete
fake_db[item.id] = 'Done.'
@asynccontextmanager
async def lifespan(app: FastAPI):
q = asyncio.Queue() # note that asyncio.Queue() is not thread safe
pool = ProcessPoolExecutor()
asyncio.create_task(process_requests(q, pool)) # Start the requests processing task
yield {'q': q, 'pool': pool}
pool.shutdown() # free any resources that the pool is using when the currently pending futures are done executing
fake_db = {}
app = FastAPI(lifespan=lifespan)
@app.get("/add")
async def add_task(request: Request, name: str):
item_id = str(uuid.uuid4())
item = Item(item_id, name)
request.state.q.put_nowait(item) # Add request to the queue
fake_db[item_id] = 'Pending...'
return item_id
@app.get("/status")
async def check_status(item_id: str):
if item_id in fake_db:
return {'status': fake_db[item_id]}
else:
return JSONResponse("Item ID Not Found", status_code=404)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app)
Note
In case you encountered any memory leak—i.e., memory that is no longer needed, but is not released—when (re)using a shared ProcessPoolExecutor
, you could instead create a new instance of the ProcessPoolExecutor
class for every request that needs to be processed and have it terminated (using the with
statement) right after the processing is complete (Note, however, that creating and destroying many processes over and over could become computationally expensive). Example:
async def process_requests(q: asyncio.Queue):
while True:
# ...
with ProcessPoolExecutor() as pool:
r = await loop.run_in_executor(pool, cpu_bound_task, item)
# ...
Answered By - Chris
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.