Issue
Some backend endpoint returns a parquet file as an octet-stream.
In Pandas I can do something like this:
result = requests.get("https://..../file.parquet")
df = pd.read_parquet(io.BytesIO(result.content))
Can I do it in Dask somehow?
This code:
dd.read_parquet("https://..../file.parquet")
raises exception (obviously, because this is bytes-like object):
File "to_parquet_dask.py", line 153, in <module>
main(*parser.parse_args())
File "to_parquet_dask.py", line 137, in main
download_parquet(
File "to_parquet_dask.py", line 121, in download_parquet
dd.read_parquet(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
read_metadata_result = engine.read_metadata(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
parts, pf, gather_statistics, base_path = _determine_pf_parts(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 148, in _determine_pf_parts
elif fs.isdir(paths[0]):
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
raise result[0]
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 418, in _isdir
return bool(await self._ls(path))
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 195, in _ls
out = await self._ls_real(url, detail=detail, **kwargs)
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 150, in _ls_real
text = await r.text()
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1082, in text
return self._body.decode(encoding, errors=errors) # type: ignore
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 7: invalid start byte
UPD
With changes in fsspec from @mdurant's answer I got this error:
ValueError: Cannot seek streaming HTTP file
So I put "simplecache::" to my url and I face next:
Traceback (most recent call last):
File "to_parquet_dask.py", line 161, in <module>
main(*parser.parse_args())
File "to_parquet_dask.py", line 145, in main
download_parquet(
File "to_parquet_dask.py", line 128, in download_parquet
dd.read_parquet(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
read_metadata_result = engine.read_metadata(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
parts, pf, gather_statistics, base_path = _determine_pf_parts(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 185, in _determine_pf_parts
pf = ParquetFile(
File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fastparquet/api.py", line 127, in __init__
raise ValueError("Opening directories without a _metadata requires"
ValueError: Opening directories without a _metadata requiresa filesystem compatible with fsspec
Temporary workaround
Maybe this way is dirty and not optimal, but some kind of works:
@dask.delayed
def parquet_from_http(url, token):
result = requests.get(
url,
headers={'Authorization': token}
)
return pd.read_parquet(io.BytesIO(result.content))
delayed_download = parquet_from_http(url, token)
df = dd.from_delayed(delayed_download, meta=meta)
P.S. meta argument in this approach is necessary, because otherwise Dask will use this function twice: to find out meta and than to calculate, so two requests will be made.
Solution
This is not an answer, but I believe the following change in fsspec will fix your problem. If you would be willing to try and confirm, we can make this a patch.
--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -472,7 +472,10 @@ class HTTPFileSystem(AsyncFileSystem):
async def _isdir(self, path):
# override, since all URLs are (also) files
- return bool(await self._ls(path))
+ try:
+ return bool(await self._ls(path))
+ except (FileNotFoundError, ValueError):
+ return False
(we can put this in a branch, if that makes it easier for you to install)
-edit-
The second problem (which is the same thing in both parquet engines) stems from the server either not providing the size of the file, or not allowing range-gets. The parquet format requires random access to the data to be able to read. The only way to get around this (short of improving the server) is to copy the whole file locally, e.g., by prepending "simplecache::" to your URL.
Answered By - mdurant
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.