Issue
Goal:
Calculate mean_absolute_percentage_error
(MAPE) for each unique ID
.
y
- real valueyhat
- predicted value
Sample PySpark Dataframe: join_df
+----------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
| Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
| Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
| Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
| Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
| Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
| Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])
I have tried by creating an uff
and applyed it on ID
s using apply
function.
from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()
However, I am getting the error:
PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'
I understand that I am requesting for MAPE as numpy output and it should be dataframe. But I am sure if I know what exactly needs to be done differently in order to get MAPE for each ID
.
Solution
You need return a DataFrame
with PandasUDFType.GROUPED_MAP , since you are returning a numpy array , hence you see the exception.
You need to modify the schema as well towards your final returned dataframe from the group by function
Also you should use - applyInPandas , I have added its usage as well
Data Preparation
s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929, 693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253, 664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392, 639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268, 665.9529
""")
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+------+----------+-------+---------+----------+----------+
Pandas UDF - Usage
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').apply(gr_mape_val).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
applyInPandas
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
Answered By - Vaebhav
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.