Issue
I have a pyarrow
table that looks something like the following:
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
test_table = pa.table([
pa.array(["a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "c", "c", "c", "d", "d", "d", "d", "e", "e", "e", "e", "e", "f", "f", "f", "f", "f", "f"]),
pa.array([1, 2, 3, 4, 4, 1, 1, 1, 1, 1, 5, 5, 5, 5, 4, 6, 6, 8, 9, 7, 7, 7, 7, 10, 11, 12, 33, 22, 55, 22]),
pa.array([1, 3, 2, 2, 4, 1, 1, 1, 1, 1, 4, 4, 4, 5, 4, 8, 8, 8, 9, 10, 10, 22, 12, 10, 11, 12, 33, 22, 55, 22]),
pa.array([1, 2, 3, 3, 4, 1, 1, 1, 1, 1, 6, 6, 5, 6, 4, 6, 6, 6, 9, 7, 33, 22, 22, 10, 11, 12, 33, 22, 55, 22])
], names=["ID", "Col1", "Col2", "Col3"])
I need to get the mean, min/max, and most frequent values of the columns, most of which is easily done in the documentation: https://arrow.apache.org/docs/python/compute.html#py-grouped-aggrs
And I can implement with examples like the following:
output = "mean"
grenerator = [([x for x in test_table.schema.names if "Col" in x][i], output) for i in range(len([x for x in test_table.schema.names if "Col" in x]))]
test_table.group_by("ID").aggregate(grenerator).to_pandas()
output = "max"
grenerator = [([x for x in test_table.schema.names if "Col" in x][i], output) for i in range(len([x for x in test_table.schema.names if "Col" in x]))]
test_table.group_by("ID").aggregate(grenerator).to_pandas()
However, getting the most frequent value (aka, mode) appears to be a computational bottleneck as while the function is there for computing the value across an array: https://arrow.apache.org/docs/python/generated/pyarrow.compute.unique.html
It is not present for group-level aggregation.
I have implemented the following for now:
grenerator = test_table.schema.names
output_table = pd.DataFrame(columns = grenerator)
for parts in range(len(pa.compute.unique(test_table["ID"]).to_pandas())):
temp = test_table.filter(pa.compute.field("ID") == pa.compute.unique(test_table["ID"])[parts])
output_table.loc[parts, "ID"] = pa.compute.unique(test_table["ID"])[parts].as_py()
if len(test_table.filter(pa.compute.field("ID") == pa.compute.unique(test_table["ID"])[parts])) > 0:
for bits in range(len([x for x in output_table.columns if "Col" in x])):
output_table.loc[parts, [x for x in output_table.columns if "Col" in x][bits]] = pc.mode(temp[[x for x in output_table.columns if "Col" in x][bits]])[0][0].as_py()
else:
print(test_table["ID"][parts].as_py() + " has no data. Moving on.....")
continue
print(output_table)
But it takes around 36 minutes to run with my actual data as a cell-wise loop as opposed to 2.5 seconds or less for the mean and max using the aggregate function.
I realize that there is a way to implement user defined functions in pyarrow
:
https://arrow.apache.org/docs/python/compute.html#user-defined-functions
But I'm not sure where to begin if I were to create such a thing.
Any suggestions as to how I might be able to apply pc.compute.mode
across columns, create a custom group aggregation function and apply it across the columns, or otherwise speed up the workflow, would be greatly appreciated!
Thank you in advance.
Solution
You can implement mode with a couple of GroupBy and a sort.
(
test_table.group_by(["ID", "Col1"])
.aggregate([([], "count_all")])
.sort_by("count_all")
.group_by(["ID"], use_threads=False)
.aggregate([("Col1", "last")])
)
ID | Col1_last |
---|---|
a | 4 |
b | 1 |
c | 5 |
d | 6 |
e | 7 |
f | 22 |
You'll have to do it for each column and them join each column's result back together.
PS: there is an issue for it, but mode isn't the simplest aggregation to implement given it requires an unbounded state https://github.com/apache/arrow/issues/20359
Answered By - 0x26res
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.