Issue
I have a FastAPI
program that receives like 30-100 req/sec request content are from 1Kb-1Mb in size and I'm running a regex matching on the content, the regex is whitespace tolerant that is I can split the content into multiple chunks and run the regex matching on each chunk in a different process, but as the code is running asynchronously, I need a way to keep the API running while the processes execute.
so I tried using ProcessPoolExecutor
with run_in_executor
from asyncio
here is the MVP code of what I did:
import asyncio
import concurrent.futures
import functools
async def executor_task(fn,executor=None ):
event_loop = asyncio.get_event_loop()
return await event_loop.run_in_executor(executor, fn)
def split_on_whitespace(content:str, count):
if not content: return ['' for i in range(count)]
length = len(content)
part = int(length / count )
last_beg = 0
last_end = part
splitted = []
for beg,end in zip(range(part, length, part), range(part*2, length, part)):
new_content = content[beg:end]
last_end = re.search("[\"'\s]", new_content, )
if not last_end :
splitted = splitted + ['' for i in range(count - len(splitted))]
break
last_end = last_end.end() + beg
content = content[last_beg: last_end]
splitted.append(content)
last_beg = last_end
last_end = end
return splitted
def run_regex_on_content_chunk(content):
domains = []
domain_patt = re.compile(r'([a-zA-Z0-9\-_]+\.){1,}[a-zA-Z0-9\-_]+') # extract domain name
for df in re.findall(domain_patt, content):
domains.append(content[df.start(): df.end()])
return domains
@app.post("/addContent")
async def add_content(content:dict):
all_content = content['data']
nworkers = 6
content_chunks = split_on_white_space(all_content) #split content
async_tasks = []
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for chunk in content_chunks:
regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
async_tasks.append(executor_task(regex_fn, executor)) # add to gather later
await asyncio.gather(*async_tasks) # gather
running this code creates the processes but the API hangs altogether, when checking the processes they seem to be idle, but still the API is hangs and is not usable at all and the execution seems to never leave the with
statement.
PS:the split_on_white_space runs function and run_regex_on_content_chunk don't contain loops or any blocking code of any kind
Solution
The problem is there is confusing the lifetime/costs of creating your resources.
An external proccess is a "huge" thing when compared with, say, calling a function - and by calling ProcessPoolExecutor
inside your view code you are creating several processes inside each view.
So your view code expands from what would be a few milliseconds to execute to maybe several seconds. The whole idea of having a "process pool" is to have workers in several external processes pre-spawned and ready to process your data. Your code just turns it on its head, and adds a lot of boiler-plate heavy-lifting to the processing of each view, without adding anything to the processing of the data itself.
The thing to do there is to have a single ProcessPoolExecutor instance for the lifetime of your server application. That in itself may be a bit hard to setup correctly - maybe you'd be better off working with something like "Celery" instead, which can handle failing workers, and even have workers in different machines allowing for horizontal scaling for free. But changing the code for that is another level.
For now, focusing on your minimal example, something along this, using the lifespan
parameter when starting your fastAPI instance might work. Bellow, I just changed the code in your view to reuse the same process-pool, and added the code to create a single executor pool for each server process.
Just beware that there is another thing that may be causing your code to fail althogether (instead of just running 10000 times slower): the call to FastAPI itself must take place only on the main process - if it is not guarded, even indirectly, in a check for that (the if __name__ == "__main__":
clause), when trying to create the multiprocessing pool your code may simply be setting up several FastAPI servers in a run-away chain.
...
@app.post("/addContent")
async def add_content(content:dict):
all_content = content['data']
nworkers = 6
content_chunks = split_on_white_space(all_content) #split content
async_tasks = []
for chunk in content_chunks:
regex_fn = functools.partial(run_regex_on_content_chunk, chunk) # make the function with args
async_tasks.append(executor_task(regex_fn, executor)) # add to gather later
await asyncio.gather(*async_tasks)
...
from contextlib import asynccontextmanager
process_pool = None
@asynccontextmanager
async def executor_pool(app):
global process_pool
nworkers = 18 # These workers are shared for all views -
# even though, if CPU is 100% occupation you should get no
# gain above the number of hardware threads you have,
# in theory - I think that when we factor in the
# idle time for networking, and such about 3X that
# number will serve you well. Of course
# CPU usage should be monitored in production -
# as long as you don't reach close to 100% under maximum load
# you can further increase the number of workers.
process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=nworkers)
try:
yield # at this point, fastAPI will setup the server and run your application
finally:
process_pool.shutdown() # and this runs when your server is stopping!
if __name__ == "__main__":
app = FastAPI(lifespan=executor_pool) # use this line to call "FastAPI" - the line you do this is not present in your example code.
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.