Issue
Here's a minimal reproducible example of my FastAPI app. I have a strange behavior and I'm not sure I understand the reason.
I'm using ApacheBench (ab
) to send multiple requests as follows:
ab -n 1000 -c 50 -H 'accept: application/json' -H 'x-data-origin: source' 'http://localhost:8001/test/async'
FastAPI app
import time
import asyncio
import enum
from typing import Any
from fastapi import FastAPI, Path, Body
from starlette.concurrency import run_in_threadpool
app = FastAPI()
loop = asyncio.get_running_loop()
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(None, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
Here's the ApacheBench results:
async with (asyncio.sleep) : *Concurrency Level: 50
- Time taken for tests: 63.528 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 15.74 [#/sec] (mean)
- Time per request: 3176.407 [ms] (mean)
- Time per request: 63.528 [ms] (mean, across all concurrent requests) Transfer rate: 1.97 [Kbytes/sec] received*
sync (with time.sleep): Concurrency Level: 50
- *Time taken for tests: 78.615 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 12.72 [#/sec] (mean)
- Time per request: 3930.751 [ms] (mean)
- Time per request: 78.615 [ms] (mean, across all concurrent requests) Transfer rate: 1.59 [Kbytes/sec] received*
sync_async (time sleep with run_in_executor) : *Concurrency Level: 50
- Time taken for tests: 256.201 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 3.90 [#/sec] (mean)
- Time per request: 12810.038 [ms] (mean)
- Time per request: 256.201 [ms] (mean, across all concurrent requests) Transfer rate: 0.49 [Kbytes/sec] received*
sync_async_fastapi (time sleep with run_in threadpool): *Concurrency Level: 50
- Time taken for tests: 78.877 seconds
- Complete requests: 1000
- Failed requests: 0
- Total transferred: 128000 bytes
- HTML transferred: 4000 bytes
- Requests per second: 12.68 [#/sec] (mean)
- Time per request: 3943.841 [ms] (mean)
- Time per request: 78.877 [ms] (mean, across all concurrent requests) Transfer rate: 1.58 [Kbytes/sec] received*
In conclusion, I'm experiencing a surprising disparity in results, especially when using run_in_executor, where I'm encountering significantly higher average times (12 seconds). I don't understand this outcome.
--- EDIT --- After AKX answer.
Here the code working as expected:
import time
import asyncio
from anyio import to_thread
to_thread.current_default_thread_limiter().total_tokens = 200
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=100)
def sync_func() -> None:
time.sleep(3)
print("sync func")
async def sync_async_with_fastapi_thread() -> None:
await run_in_threadpool( time.sleep, 3)
print("sync async with fastapi thread")
async def sync_async_func() -> None:
await loop.run_in_executor(executor, time.sleep, 3)
async def async_func() -> Any:
await asyncio.sleep(3)
print("async func")
@app.get("/test/sync")
def test_sync() -> None:
sync_func()
print("sync")
@app.get("/test/async")
async def test_async() -> None:
await async_func()
print("async")
@app.get("/test/sync_async")
async def test_sync_async() -> None:
await sync_async_func()
print("sync async")
@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
await sync_async_with_fastapi_thread()
print("sync async with fastapi thread")
Solution
Using run_in_threadpool()
Starlette's run_in_threadpool()
uses anyio.to_thread.run_sync()
, behind the scenes, which "will run the sync blocking function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked"—see this answer and AnyIO's Working with threads documentation for more details. Calling anyio.to_thread.run_sync()
—which internally calls AsyncIOBackend.run_sync_in_worker_thread()
—will return a coroutine that can be await
ed to get the eventual result of the sync function (e.g., result = await run_in_threadpool(...)
), and hence, FastAPI will still work asynchronously. As can be seen in Starlette's source code (link is given above), run_in_threadpool()
simply looks like this (supporting both sequence and keyword arguments):
async def run_in_threadpool(
func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
if kwargs: # pragma: no cover
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args)
As described in AnyIO's documentation:
Adjusting the default maximum worker thread count
The default AnyIO worker thread limiter has a value of
40
, meaning that any calls toto_thread.run_sync()
without an explicitlimiter
argument will cause a maximum of40
threads to be spawned. You can adjust this limit like this:from anyio import to_thread async def foo(): # Set the maximum number of worker threads to 60 to_thread.current_default_thread_limiter().total_tokens = 60
Note
AnyIO’s default thread pool limiter does not affect the default thread pool executor on
asyncio
.
Since FastAPI uses Startlette's concurrency
module to run requests/blocking functions in an external threadpool, the default value of the thread limiter is also applied, i.e., 40
threads maximum—see the relevant AsyncIOBackend.current_default_thread_limiter()
method that returns the CapacityLimiter
with the default number of threads. As described above, one can adjust that value, thus increasing the number of threads, which might lead to an improvement in performance results—always depending on the number of requests your API is expected to serve concurrently. For instance, if you expect the API to serve no more than 50 requests at a time, then set the maximum number of threads to 50—if you have synchronous/blocking background tasks/StreamingResponse
's generators (i.e., functions defined with normal def
instead of async def
), or use UploadFile
's operations as well, you could add more threads as required, as FastAPI actually runs all those in an external threadpool, using run_in_threadpool
—it is all explained in this answer in details.
Note that using the approach below, which was described here, would have the same effect on adjusting the number of worker threads:
from anyio.lowlevel import RunVar
from anyio import CapacityLimiter
RunVar("_default_thread_limiter").set(CapacityLimiter(60))
But, it would be best to follow the approach provided by AnyIO's official documentation (as shown earlier). It is also a good idea to have this done when the application starts up, using a lifespan
event handler, as demonstrated here.
Working Example 1
from fastapi import FastAPI
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync")
def test_sync() -> None:
time.sleep(3)
print("sync")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
Using ApacheBench, you could test the example above as follows, which will send 1000
requests in total with 50
being sent simultaneously at a time (-n
: Number of requests, -c
: Number of concurrent requests):
ab -n 1000 -c 50 "http://localhost:8000/sync"
Since the /sync
endpoint above is defined with normal def
instead of async def
, FastAPI will use run_in_threadpool()
, behind the scenes, to run it in a separate thread and await
it, thus ensuring that event loop (and hence, the main thread) does not get blocked due to the blocking operations (either blocking IO-bound or CPU-bound) that will be performed inside that endpoint.
While running a performance test on the example above, if you call the /get_available_threads
endpoint from your browser, e.g., http://localhost:8000/get_available_threads
, you would see that the amount of threads available is always 10 or above (since only 50 threads are used at a time in this test, but the thread limiter was set to 60
), meaning that setting the maximum number of threads on AnyIO's thread limiter to a number that is well above your needs, like 200
as shown in some other answer and in your recent example, wouldn't bring about any improvements in the performance; on the contrary, you would end up with a number of threads "sitting" there without being used. As explained earlier, the number of maximum threads should depend on the number of requests your API is expected to serve concurrently, as well as any other blocking tasks/functions that would run in the threadpool by FastAPI itself, under the hood (and of course, on the server machine's resources available).
The example below is the same as the one above, but instead of letting FastAPI itself to handle the blocking operation(s) inside the def
endpoint (by running the def
endpoint in the external threadpool and await
ing it), the endpoint is now defined with async def
(meaning that FastAPI will run it directly in the event loop), but inside the endpoint, run_in_threadpool()
is used (which returns an await
able) to run the blocking operation. Performing a benchmark test on the example below would yield similar results to the previous example.
Working Example 2
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from contextlib import asynccontextmanager
from anyio import to_thread
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
to_thread.current_default_thread_limiter().total_tokens = 60
yield
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async_run_in_tp")
async def test_sync_async_with_run_in_threadpool() -> None:
await run_in_threadpool(time.sleep, 3)
print("sync_async using FastAPI's run_in_threadpool")
@app.get('/get_available_threads')
async def get_available_threads():
return to_thread.current_default_thread_limiter().available_tokens
Using ApacheBench, you could test the example above as follows:
ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"
Using loop.run_in_executor()
with ThreadPoolExecutor
When using asyncio
's loop.run_in_executor()
—after obtaining the running event loop using asyncio.get_running_loop()
—one could pass None
to the executor
argument, which would lead to the default executor being used; that is, a ThreadPoolExecutor
. Note that when calling loop.run_in_executor()
and passing None
to the executor
argument, this does not create a new instance of a ThreadPoolExecutor
every time you do that; instead, a ThreadPoolExecutor
is only initialised once the first time you do that, but for subsequent calls to loop.run_in_executor()
with passing None
to the executor
argument, Python reuses that very same instance of ThreadPoolExecutor
(hence, the default executor). This can been seen in the source code of loop.run_in_executor()
. That means, the number of threads that can be created, when calling await loop.run_in_executor(None, ...)
, is limited to the default number of thread workers in the ThreadPoolExecutor
class.
As described in the documentation of ThreadPoolExecutor
—and as shown in its implementation here—by default, the max_workers
argument is set to None
, in which case, the number of worker threads is set based on the following equation: min(32, os.cpu_count() + 4)
. The os.cpu_count()
function reutrns the number of logical CPUs in the current system. As explained in this article, physical cores refers to the number of CPU cores provided in the hardware (e.g., the chips), while logical cores is the number of CPU cores after hyperthreading is taken into account. If, for instance, your machine has 4 physical cores, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate 12 threads (8 CPUs + 4) to the pool by default (Python limits the number of threads to 32 to "avoid consuming surprisingly large resources on multi-core machines"; however, one could always adjust the max_workers
argument on their own when using a custom ThreadPoolExecutor
, instead of using the default one). You could check the default number of worker threads on your system as follows:
import concurrent.futures
# create a thread pool with the default number of worker threads
pool = concurrent.futures.ThreadPoolExecutor()
# report the number of worker threads chosen by default
# Note: `_max_workers` is a protected variable and may change in the future
print(pool._max_workers)
Now, as shown in your original example, you are not using a custom ThreadPoolExecutor
, but instead using the default ThreadPoolExecutor
every time a request arrives, by calling await loop.run_in_executor(None, time.sleep, 3)
(inside the sync_async_func()
function, which is triggered by the /test/sync_async
endpoint). Assuming your machine has 4 physical cores with hyperthreading enabled (as explained in the example earlier), then the default number of worker threads for the default ThreadPoolExecutor
would be 12. That means, based on your original example and the /test/sync_async
endpoint that triggers the await loop.run_in_executor(None, time.sleep, 3)
function, your application could only handle 12 concurrent requests at a time. That is the main reason for the difference observed in the performance results when compared to using run_in_threadpool()
, which comes with 40
allocated threads by default.
One way to solve this is to create a new instance of ThreadPoolExecutor
(on your own, instead of using the default executor) every time a request arrives and have it terminated once the task is completed (using the with
statement), as shown below:
import concurrent.futures
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
await loop.run_in_executor(pool, time.sleep, 3)
While this should wok just fine, it would be best to instantiate a ThreadPoolExecutor
once at the application startup, adjust the number of worker threads as needed, and re-use that executor when required. Having said that, depending on the blocking task and/or external libraries you might be using for that task, if you ever encounter a memory leak after tasks are completed when re-using a ThreadPoolExecutor
—i.e., memory that is no longer needed, but is not released—you might find creating a new instance of ThreadPoolExecutor
each time, as shown above, more suitable (Note, however, that if this was a ProcessPoolExecutor
instead, creating and destroying many processes over and over could become computationally expensive).
Below is a complete working example, demonstrating how to create a re-usable custom ThreadPoolExecutor
. Calling the /get_active_threads
endpoint from your browser, e.g., http://localhost:8000/get_active_threads
, while running a performance test with ApacheBench (using 50
concurrent requests, as described in your question and as shown below), you would see that the number of active threads never goes above 51
(50 concurrent threads + 1, which is the main thread), despite setting the max_workers
argument to 60
in the example below. This is simply because, in this performance test, the application is never required to serve more than 50
requests at the same time. Also, ThreadPoolExecutor
won't spin new threads, if idle threads are available (thus saving resources)—see the relevant implementation part. Hence, again, initialising the ThreadPoolExecutor
with max_workers=100
, as shown in your recent update, would be unecessary, if you never expect your FastAPI application to serve more than 50 requests at a time.
Working Example
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import concurrent.futures
import threading
import asyncio
import time
@asynccontextmanager
async def lifespan(app: FastAPI):
pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
yield {'pool': pool}
pool.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/sync_async")
async def test_sync_async(request: Request) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(request.state.pool, time.sleep, 3)
print("sync_async")
@app.get('/get_active_threads')
async def get_active_threads():
return threading.active_count()
Using ApacheBench, you could test the example above as follows:
ab -n 1000 -c 50 "http://localhost:8000/sync_async"
Final Notes
In general, you should always aim for using asynchronous code (i.e., using async
/await
), wherever is possible, as async
code, also known as coroutines, run in the event loop, which runs in the main thread and executes all tasks in that thread. That means there is only one thread that can take a lock on the interpreter. When dealing with sync blocking IO-bound tasks though, you could either (1) define your endpoint with def
and let FastAPI handle it behind the scenes as described earlier and in this answer, or (2) define your endpoint with async def
and use run_in_threadpool()
on your own to run that blocking task in a separate thread and await
it, or (3) use asyncio
's loop.run_in_executor()
with a custom (preferably re-usable) ThreadPoolExecutor
, adjusting the number of workers as required. When required to perform blocking CPU-bound tasks, while running such tasks in an external thread and await
ing them would successfully prevent the event loop from getting blocked, it wouldn't, however, provide the performance improvement you would expect from running code in parallel. Thus, for CPU-bound tasks, one may choose to use a ProcessPoolExecutor
instead (Note: when using processes in general, you need to explicitly protect the entry point with if __name__ == '__main__'
)—example on using a ProcessPoolExecutor
can be found in this answer.
To run tasks in the background, without waiting for them to complete in order to proceed with executing the rest of the code in an endpoint, you could use FastAPI's BackgroundTasks
, as shown here and here. If the background task function is defined with async def
, FastAPI will run it directly in the event loop, whereas if it is defined with normal def
, FastAPI will use run_in_threadpool()
and await
the returned coroutine (same concept as API endpoints). Another option when you need to run an async def
function in the background, but not necessarily having it trigerred after returning a FastAPI response (which is the case in BackgroundTasks
), is to use asyncio.create_task()
, as shown in this answer and this answer. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process, you may benefit from using other bigger tools such as Celery.
Finally, regarding the optimal/maximum number of worker threads, I would suggest reading this article (have a look at this article as well for more details on ThreadPoolExecutor
in general). As explained in the article:
It is important to limit the number of worker threads in the thread pools to the number of asynchronous tasks you wish to complete, based on the resources in your system, or on the number of resources you intend to use within your tasks.
Alternately, you may wish to increase the number of worker threads dramatically, given the greater capacity in the resources you intend to use.
[...]
It is common to have more threads than CPUs (physical or logical) in your system. The reason for this is that threads are used for IO-bound tasks, not CPU-bound tasks. This means that threads are used for tasks that wait for relatively slow resources to respond, like hard drives, DVD drives, printers, network connections, and much more.
Therefore, it is not uncommon to have tens, hundreds and even thousands of threads in your application, depending on your specific needs. It is unusual to have more than one or a few thousand threads. If you require this many threads, then alternative solutions may be preferred, such as
AsyncIO
.
Also, in the same article:
Does the Number of Threads in the
ThreadPoolExecutor
Match the Number of CPUs or Cores?The number of worker threads in the
ThreadPoolExecutor
is not related to the number of CPUs or CPU cores in your system.You can configure the number of threads based on the number of tasks you need to execute, the amount of local system resources you have available (e.g., memory), and the limitations of resources you intend to access within your tasks (e.g., connections to remote servers).
How Many Threads Should I Use?
If you have hundreds of tasks, you should probably set the number of threads to be equal to the number of tasks.
If you have thousands of tasks, you should probably cap the number of threads at hundreds or 1,000.
If your application is intended to be executed multiple times in the future, you can test different numbers of threads and compare overall execution time, then choose a number of threads that gives approximately the best performance. You may want to mock the task in these tests with a random sleep operation.
What Is the Maximum Number of Worker Threads in the
ThreadPoolExecutor
?There is no maximum number of worker threads in the
ThreadPoolExecutor
.Nevertheless, your system will have an upper limit of the number of threads you can create based on how much main memory (RAM) you have available.
Before you exceed main memory, you will reach a point of diminishing returns in terms of adding new threads and executing more tasks. This is because your operating system must switch between the threads, called context switching. With too many threads active at once, your program may spend more time context switching than actually executing tasks.
A sensible upper limit for many applications is hundreds of threads to perhaps a few thousand threads. More than a few thousand threads on a modern system may result in too much context switching, depending on your system and on the types of tasks that are being executed.
Answered By - Chris
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.