Issue
I would like to know how to stream a DataFrame using FastAPI without having to save the DataFrame to a csv file on disk. Currently, what I managed to do is to stream data from the csv file, but the speed was not very fast compared to returning a FileResponse
. The /option7
below is what I'm trying to do.
My goal is to stream data from FastAPI backend without saving the DataFrame to a csv file.
Thank you.
from fastapi import FastAPI, Response,Query
from fastapi.responses import FileResponse,HTMLResponse,StreamingResponse
app = FastAPI()
df = pd.read_csv("data.csv")
@app.get("/option4")
def load_questions():
return FileResponse(path="C:Downloads/data.csv", filename="data.csv")
@app.get("/option5")
def load_questions():
def iterfile(): #
with open('data.csv', mode="rb") as file_like: #
yield from file_like #
return StreamingResponse(iterfile(), media_type="text/csv")
@app.get("/option7")
def load_questions():
def iterfile(): #
#with open(df, mode="rb") as file_like: #
yield from df #
return StreamingResponse(iterfile(), media_type="application/json")
Solution
Approach 1 (recommended)
As mentioned in this answer, as well as here and here, when the entire data (a DataFrame
in your case) is already loaded into memory, there is no need to use StreamingResponse
. StreamingResponse
makes sense when you want to transfer real-time data and when you don't know the size of your output ahead of time, and you don't want to wait to collect it all to find out before you start sending it to the client, as well as when a file that you would like to return is too large to fit into memory—for instance, if you have 8GB of RAM, you can't load a 50GB file—and hence, you would rather load the file into memory in chunks.
In your case, as the DataFrame
is already loaded into memory, you should instead return a custom Response
directly, after using .to_json()
method to convert the DataFrame
into a JSON string, as described in this answer (see related posts here and here as well). Example:
from fastapi import Response
@app.get("/")
def main():
return Response(df.to_json(orient="records"), media_type="application/json")
If you find the browser taking a while to display the data, you may want to have the data downloaded as a .json
file to the user's device (which would be completed much faster), rather than waiting for the browser to display a large amount of data. You can do that by setting the Content-Disposition
header in the Response
using the attachment
parameter (see this answer for more details):
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')
You could also return the data as a .csv
file, using the .to_csv()
method without specifying the path parameter. Since using return df.to_csv()
would result in displaying the data in the browser with \r\n
characters included, you might find it better to put the csv data in a Response
instead, and specify the Content-Disposition
header, so that the data will be downloaded as a .csv
file. Example:
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
return Response(df.to_csv(), headers=headers, media_type="text/csv")
Approach 2
To use a StreamingResponse
, you would need to iterate over the rows in a DataFrame, convert each row into a dictionary and subsequently into a JSON string, using either the standard json
library, or other faster JSON encoders, as described in this answer (the JSON string will be later encoded into byte
format internally by FastAPI/Starlette, as shown in the source code here). Example:
@app.get("/")
def main():
def iter_df():
for _, row in df.iterrows():
yield json.dumps(row.to_dict()) + '\n'
return StreamingResponse(iter_df(), media_type="application/json")
Iterating through Pandas objects is generally slow and not recommended. As described in this answer:
Iteration in Pandas is an anti-pattern and is something you should only do when you have exhausted every other option. You should not use any function with
"iter"
in its name for more than a few thousand rows or you will have to get used to a lot of waiting.
Update
As @Panagiotis Kanavos noted in the comments section below, using either .to_json()
or .to_csv()
on the DataFrame that is already loaded into memory, would result in allocating the entire output string in memory, thus doubling the RAM usage or even worse. Hence, in the case of having such a huge amount of data that may cause your system to slow down or crash (because of running out of memory) if used either method above, you should rather use StreamingResponse
, as described earlier. You may find faster alernative methods to iterrows()
in this post, as well as faster JSON encoders, such as orjson
and ujson
, as described in this answer and this answer.
Alternatively, you could save the data to disk (you could also use a NamedTemporaryFile
, as described here, here and here, which might further increase the performance of the app), then delete the DataFrame to release the memory—you could even manually trigger the garbage collection using gc.collect()
, as shown in this answer; however, frequent calls to garbage collection is discouraged, as it is a costly operation and may affect performance—and return a FileResponse
(you could also use a StreamingResponse
instead, see this answer and this answer), and finally, have a BackgroundTask
to delete the file from disk after returning the response. Example:
from fastapi import BackgroundTasks
from fastapi.responses import FileResponse
import uuid
import os
@app.get("/")
def main(background_tasks: BackgroundTasks):
filename = str(uuid.uuid4()) + ".csv"
df.to_csv(filename)
del df # release the memory
background_tasks.add_task(os.remove, filename)
return FileResponse(filename, filename="data.csv", media_type="text/csv")
# or return StreamingResponse - see the linked answers above
The solution you may choose should be based on your application's requirements (e.g., the number of users you expect to serve simultaneously, the size of data, the response time, etc.), as well as your system's specifications (e.g., avaialable memory for allocation). Additionally, since all calls to DataFrame
's methods are synchronous, you should remember to define your endpoint with a normal def
, so that it is run in an external threadpool; otherwise, it would block the server. Alternatively, you could use Starlette's run_in_threadpool()
from the concurrency
module, which will run the to_csv()
or to_json()
function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked. Please have a look at this answer for more details on def
vs async def
, as well as solutions when one has to run synchronous blocking operations inside async def
endpoints.
Answered By - Chris
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.