Issue
I'm using the sts from tensorflow probability to generate forecast, which perform well on a sample of data I used, but I want now to try it on a wider scope and so I would like to implement in PySpark the sts model.
I have a dataset which look like that:
Id | Date | value |
---|---|---|
1 | 01/01/2021 | 10 |
1 | 01/02/2021 | 15 |
1 | 01/03/2021 | 11 |
2 | 01/01/2021 | 100 |
2 | 01/02/2021 | 120 |
2 | 01/03/2021 | 90 |
... | ... | ... |
I would like to find a way to create a forecast for each id (having the same amount of entries) and need to apply the model I build before to each one, what would be the best way to do that ?
Solution
I solved it by using pandas_udf in pyspark:
# Create the pandas udf:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def Forecast(pdf):
# From a pandas dataframe create a serie with timestamp as index
pdf = pdf.sort_values(by=['Date'])
PTS = pdf.drop(columns=['Id'])
PTS.set_index("date", inplace=True)
PTS.index = utils.add_freq(PTS.index, 'MS')
PTS.loc[:, 'value'] = PTS.loc[:, 'value'].astype(float)
_train = PTS['value'][PTS.index < Split_Date]
train = _train.to_numpy().reshape(-1, 1)
forecast_distribution = utils.myForecast(train)
fcst_mu = forecast_distribution.mean().numpy()[..., 0]
list_date = pdf.loc[pdf['Date'] >= Split_Date, 'Date'].tolist()
for i in range(len(list_date)):
pdf.loc[pdf['Date'] == list_date[i], 'QTY'] = fcst_mu[i]
return pdf
# Apply the function per group of Id:
df= df.groupby('Id').apply(Forecast)
Where myForecast
is a STS model created in another file and add_freq
is a function adding the period.
Answered By - THB
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.