Issue
People talk a lot about using parquet and pandas. And I am trying hard to understand if we can utilize the entire features of parquet files when used with pandas. For instance say I have a big parquet file (partitioned on year) with 30 columns (including year, state, gender, last_name) and many rows. I want to load the parquet file and perform similar computation that follow
import pandas as pd
df = pd.read_parquet("file.parquet")
df_2002 = df[df.year == 2002]
df_2002.groupby(["state", "gender"])["last_name"].count()
Here in this query only 4 columns (out of 30) and only year 2002
partition is used. It means we just want to bring the columns and rows that are needed for this computation, and something like this is possible in parquet with predicate and projection pushdown (and why we using parquet).
But I am trying to understand how this query behaves in pandas. Does it bring everything into memory the moment we call df = pd.read_parquet("file.parquet)
? Or any lazy factor is getting applied here to bring in the projection & predicate pushdown? If this is not the case then what is the point in using pandas with parquet? Any of this is possible with the arrow package
out there ?
Eventhough I haven't used dask
just wondering if this kind of situation is handled in dask as they perform it lazily.
I am sure this kind of situation is handled well in the spark world, but just wondering how these situations are handled in local scenarios with packages like pandas, arrow,dask, ibis etc.
Solution
And I am trying hard to understand if we can utilize the entire features of parquet files when used with pandas.
TL;DR: Yes, but you may have to work harder than if you used something like Dask.
For instance say I have a big parquet file (partitioned on year)
This is pedantic but a single parquet file is not partitioned on anything. Parquet "datasets" (collections of files) are partitioned. For example:
my_dataset/year=2002/data.parquet
my_dataset/year=2003/data.parquet
Does it bring everything into memory the moment we call df = pd.read_parquet("file.parquet) ?
Yes. But...you can do better:
df = pd.read_parquet('/tmp/new_dataset', filters=[[('year','=', 2002)]], columns=['year', 'state', 'gender', 'last_name'])
The filters
keyword will pass the filter down to pyarrow which will apply the filter in a pushdown fashion both to the partition (e.g. to know which directories need to be read) and to the row group statistics.
The columns
keyword will pass the column selection down to pyarrow which will apply the selection to only read the specified columns from disk.
Any of this is possible with the arrow package out there ?
Everything in pandas' read_parquet
file is being handled behind the scenes by pyarrow (unless you change to some other engine). Traditionally, the group_by
would then be handled by directly by pandas (well, maybe numpy) but pyarrow has some experimental compute APIs as well if you wanted to try doing everything in pyarrow.
Eventhough I haven't used dask just wondering if this kind of situation is handled in dask as they perform it lazily.
In my understanding (I don't have a ton of experience with dask), when you say...
df_2002 = df[df.year == 2002]
df_2002.groupby(["state", "gender"])["last_name"].count()
...in a dask dataframe then dask will figure out that it can apply pushdown filters and predicates and it will do so when loading the data. So dask takes care of figuring out what filters you should apply and what columns you need to load. This saves you from having to figure it out yourself ahead of time.
Complete example (you can use strace
to verify that it is only loading one of the two parquet files and only part of that file):
import pyarrow as pa
import pyarrow.dataset as ds
import pandas as pd
import shutil
shutil.rmtree('/tmp/new_dataset')
tab = pa.Table.from_pydict({
"year": ["2002", "2002", "2002", "2002", "2002", "2002", "2003", "2003", "2003", "2003", "2003", "2003"],
"state": [ "HI", "HI", "HI", "HI", "CO", "CO", "HI", "HI", "CO", "CO", "CO", "CO"],
"gender": [ "M", "F", None, "F", "M", "F", None, "F", "M", "F", "M", "F"],
"last_name": ["Smi", "Will", "Stev", "Stan", "Smi", "Will", "Stev", "Stan", "Smi", "Will", "Stev", "Stan"],
"bonus": [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
})
ds.write_dataset(tab, '/tmp/new_dataset', format='parquet', partitioning=['year'], partitioning_flavor='hive')
df = pd.read_parquet('/tmp/new_dataset', filters=[[('year','=', 2002)]], columns=['year', 'state', 'gender', 'last_name'])
df_2002 = df[df.year == 2002]
print(df.groupby(["state", "gender"])["last_name"].count())
Disclaimer: You are asking about a number of technologies here. I work pretty closely with the Apache Arrow project and thus my answer may be biased in that direction.
Answered By - Pace
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.