Issue
When performing the operation: Dask.dataframe.to_parquet(data)
, if data
was read via Dask
with a given number of partitions, and you try to save it in parquet format after having removed some columns, it fails with e.g. the following error:
FileNotFoundError: [Errno 2] No such file or directory: part.0.parquet'
Anyone encountered the same issue?
Here is a minimal example - note that way 1 works as expected, while way 2 does NOT:
import numpy as np
import pandas as pd
import dask.dataframe as dd
# -------------
# way 1 - works
# -------------
print('way 1 - start')
A = np.random.rand(200,300)
cols = np.arange(0, A.shape[1])
cols = [str(col) for col in cols]
df = pd.DataFrame(A, columns=cols)
ddf = dd.from_pandas(df, npartitions=11)
# compute and resave
ddf.drop(cols[0:11], axis=1)
dd.to_parquet(
ddf, 'error.parquet', engine='auto', compression='default',
write_index=True, overwrite=True, append=False)
print('way 1 - end')
# ----------------------
# way 2 - does NOT work
# ----------------------
print('way 2 - start')
ddf = dd.read_parquet('error.parquet')
# compute and resave
ddf.drop(cols[0:11], axis=1)
dd.to_parquet(
ddf, 'error.parquet', engine='auto', compression='default',
write_index=True, overwrite=True, append=False)
print('way 2 - end')
Solution
Oh, great, you've spotted a bug due to overwrite=True
option. So what happens is when overwrite=True
option is set, dask removes the path, see these lines. Now, in your example ddf
is lazy, so when it's time to write the data, dask tries to read the files, but they are already gone by now.
So one work-around solution is to save the new dataframe into a different path, then remove the old folder and move the new dataframe's folder to the old folder (some of the options are here).
Another option is to load the ddf
in memory (if it fits) and then use your code:
print('way 2 - start')
ddf = dd.read_parquet('error.parquet')
# # compute and persist in memory (note do not use
# # .compute because the dataframe will turn into
# # pandas data frame
ddf = ddf.drop(cols[0:11], axis=1)
ddf = ddf.persist()
dd.to_parquet(
ddf, 'error.parquet', engine='auto', compression='default',
write_index=True, overwrite=True, append=False)
# print('way 2 - end')
As a side note, when you run ddf.drop(cols[0:11], axis=1)
, if you want this to be changed in the dataframe, you will need to assign it:
ddf = ddf.drop(cols[0:11], axis=1)
update: there is some relevant discussion here
Answered By - SultanOrazbayev
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.