Issue
We have created a service using FastAPI. When our service starts it creates a few python objects that the endpoints then use to store or retrieve data from.
FastAPI in production starts with multiple workers. Our problem is that each worker creates its own object rather than sharing a single one.
The script below shows a (simplified) example of what we are doing, though in our case the usage of Meta() is considerably more complex.
from fastapi import FastAPI, status
class Meta:
def __init__(self):
self.count = 0
app = FastAPI()
meta = Meta()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta.count += 1
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
return {'count':meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta.count = 0
return status.HTTP_200_OK
As mentioned above, the problem with multiple workers is that each one will have its own meta
object. Please be aware that the issue is not visible when running the api with a single worker.
More explicitly, when we hit the /increment
endpoint for the first time we will see only one of the two workers responding to the call (this is correct, we don't want both workers doing the same thing). However, because there are two separate meta
objects, only one of the two will be incremented.
When hitting the /report
endpoint, depending on which worker responds to the request, either 1 or 0 will be returned.
The question then is, how do we get the workers to share and operate on the same object?
As a side question, the problem above affects the /reset
endpoint too. If this endpoint is called then only one of the workers will reset its object. Is there a way to force all workers to respond to a single call on an endpoint?
Thanks!
Edit: I forgot to mention that we have tried (with no success) to store the meta
object in the app.state
instead. Essentially:
app.state.meta = Meta()
...
@app.get("/report")
async def report():
return {'count':app.state.meta.count}
Solution
It is not possible to share a python object between different processes straightforwardly.
The facilities included in the multiprocessing
module (like managers or shared memory) are not suitable for sharing resources between workers, since they require a master process creating the resources and do not have the durability property.
The most preferred means for sharing resources between workers:
- Databases - in the case of a persistent nature of resources that require reliable storage and scalability. Examples:
PostgreSQL
,MariaDB
,MongoDB
, and many others. - Caches (key/value) - in the case of a temporary nature of the data, faster than databases, but not having such scalability and often not ACID compliant. Examples:
Redis
,Memcached
and etc.
Below I will present two very simple examples of how one could use both approaches to share data in FastAPI
application between workers. As an example, I took the aiocache
library with Redis
as backend and Tortoise ORM
library with PostgreSQL
as backend. Since FastAPI
is the asynchronous framework I chose asyncio
-based libraries.
The structure of the test project is as follows:
.
├── app_cache.py
├── app_db.py
├── docker-compose.yml
├── __init__.py
Docker-compose file:
For experiments, you can use the following docker-compose file exposing 5432
(Postgres) and 6379
(Redis) ports to localhost
.
version: '3'
services:
database:
image: postgres:12-alpine
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: test_pass
POSTGRES_USER: test_user
POSTGRES_DB: test_db
redis:
image: redis:6-alpine
ports:
- "6379:6379"
Starting:
docker-compose up -d
Cache (aiocache)
Aiocache provides 3 main entities:
- backends: Allow you specify which backend you want to use for your cache. Currently supporting:
SimpleMemoryCache
,RedisCache
usingaioredis
andMemCache
usingaiomcache
.serializers
: Serialize and deserialize the data between your code and the backends. This allows you to save any Python object into your cache. Currently supporting:StringSerializer
,PickleSerializer
,JsonSerializer
, andMsgPackSerializer
. But you can also build custom ones.- plugins: Implement a hooks system that allows to execute extra behavior before and after of each command.
Starting:
uvicorn app_cache:app --host localhost --port 8000 --workers 5
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status
app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")
class Meta:
def __init__(self):
pass
async def get_count(self) -> int:
return await cache.get("count", default=0)
async def set_count(self, value: int) -> None:
await cache.set("count", value)
async def increment_count(self) -> None:
await cache.increment("count", 1)
meta = Meta()
# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
await meta.increment_count()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
count = await meta.get_count()
return {'count': count, "current_process_id": os.getpid()}
# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
await meta.set_count(0)
return status.HTTP_200_OK
Database (Tortoise ORM + PostgreSQL)
Starting: For the sake of simplicity, we first run one worker to create a schema in the database:
uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C]
uvicorn app_db:app --host localhost --port 8000 --workers 5
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise
class MetaModel(Model):
count = fields.IntField(default=0)
app = FastAPI()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count += 1 # it's better do it in transaction
await meta.save()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
meta, is_created = await MetaModel.get_or_create(id=1)
return {'count': meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count = 0
await meta.save()
return status.HTTP_200_OK
register_tortoise(
app,
db_url="postgres://test_user:test_pass@localhost:5432/test_db", # Don't expose login/pass in src, use environment variables
modules={"models": ["app_db"]},
generate_schemas=True,
add_exception_handlers=True,
)
Answered By - alex_noname
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.