Issue
Goal: Find an efficient/fastest way to iterate over a table by column and run a function on each column, in python or with a python library.
Background: I have been exploring methods to improve the speed of my functions. This is because I have two models/algorithms that I want to run one small, one large (uses torch) and the large is slow. I have been using the small one for testing. The small model is seasonal decomposition of each column.
Setup:
Testing environment: ec2, t2 large. X86_64
Python version: 3.11.5
Polars: 0.19.13
pandas: 2.1.1
numpy: 1.26.0
demo data in pandas/polars:
rows = 11020
columns = 1578
data = np.random.rand(rows, columns)
df = pd.DataFrame(data)
# df_p = pl.from_pandas(df) # convert if needed.
Pandas
pandas and dict:
from statsmodels.tsa.seasonal import seasonal_decompose
import pandas as pd
class pdDictTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {}
for column in dataframe.columns:
trend_data_dict[column] = cls().process_col(dataframe[column])
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
import timeit
start = timeit.default_timer()
trend_tensor = pdDictTrendExtractor.process_df(df)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
Program Executed in 14.349091062998923
with list comprehension instead of for loop:
class pdDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pd.DataFrame(trend_data_dict, index=dataframe.index)
return trend_dataframes
Program Executed in 14.343959668000025
Class using pandas and torch:
from statsmodels.tsa.seasonal import seasonal_decompose
import torch
import pandas as pd
class pdTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pd.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
start = timeit.default_timer()
trend_tensor = pdTrendExtractor.process_df(df_p)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
Program Executed in 23.14214362200073
with dict, multiprocessing & list comprehension: As suggested by @roganjosh & @jqurious below.
from multiprocessing import Pool
class pdMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pd.Series = None) -> pd.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
trend = result.trend.fillna(0).values
return trend
@classmethod
def process_df(cls, dataframe: pd.DataFrame) -> pd.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pd.DataFrame(trend_data_dict, index=dataframe.index)
Program Executed in 4.582350738997775, Nice and fast.
Polars
Polars & torch:
class plTorTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> torch.Tensor:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
trend = result.trend[np.isnan(result.trend)] = 0
return torch.tensor(trend, dtype=torch.float32).view(-1, 1)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> torch.Tensor:
trend_dataframes = torch.Tensor()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = torch.cat((trend_dataframes, trend_data), dim=1)
return trend_dataframes
Program Executed in 13.813817326999924
polars & lamdba:
start = timeit.default_timer()
df_p = df_p.select([
pl.all().map_batches(lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend)).fill_nan(0)
]
)
stop = timeit.default_timer()
execution_time = stop - start
print("Program Executed in "+str(execution_time))
Program Executed in 82.5596211330012
I suspect this is written poorly & the reason it is so slow. I have yet find a better method.
So far I have tried, apply_many, apply, map, map_batches or map_elements.. with_columns vs select and a few other combinations.
polars only, for loop:
class plTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
# Store data as an instance variable
def process_col(self, column_data: pl.Series = None) -> pl.DataFrame:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
# Handle missing values by replacing NaN with 0
result.trend[np.isnan(result.trend)] = 0
return pl.DataFrame({column_data.name: result.trend})
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_dataframes = pl.DataFrame()
for column in dataframe.columns:
trend_data = cls().process_col(dataframe[column])
trend_dataframes = trend_dataframes.hstack(trend_data)
return trend_dataframes
Program Executed in 13.34212675299932
with list comprehensions:
I tried with polars and list comprehension. But having difficulty with polars syntax.
with a dict & for loop:
Program Executed in 13.743039597999996
with dict & list comprehension:
class plDict2TrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
self.data = column_data
result = seasonal_decompose(self.data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
trend_data_dict = {column: cls().process_col(dataframe[column]) for column in dataframe.columns}
trend_dataframes = pl.DataFrame(trend_data_dict)
return trend_dataframes
Program Executed in 13.008102383002552
with dict, multiprocessing & list comprehension: As suggested by @roganjosh & @jqurious below.
from multiprocessing import Pool
class plMTrendExtractor:
def __init__(self, period: int = 365) -> None:
self._period = period
self._model = 'Additive'
def process_col(self, column_data: pl.Series = None) -> pl.Series:
result = seasonal_decompose(column_data, model=self._model, period=self._period)
result.trend[np.isnan(result.trend)] = 0
return pl.Series(result.trend)
@classmethod
def process_df(cls, dataframe: pl.DataFrame) -> pl.DataFrame:
with Pool() as pool:
trend_data_dict = dict(zip(dataframe.columns, pool.map(cls().process_col, [dataframe[column] for column in dataframe.columns])))
return pl.DataFrame(trend_data_dict)
Program Executed in 4.997288776001369, Nice!.
With lazyFrame?
I can add lazy & collect to the df_p.select()
method above but doing this does not improve the time. One of the key issues seems to be that the function that is passed to lazy operations needs to be lazy too. I was hoping that it might run each column in parallel.
current conclusions & notes
- I am getting a second to half a second of variation for some of the runs.
- Pandas and dict, seems to be reasonable. If you care about the index, then this can be a good option.
- Polars with dict and list comprehension are the "fastest". But not by much. Considering the variation even smaller diff.
- both options also have the benefit of not needing additional packages.
- There seems to be room for improvement in polars. In terms of better code, but not sure if this would improve time much. As the main, compute time is seasonal_decompose. Which takes ~0.012 seconds per column, if run alone.
- open to any feedback on improvements
- warning: i haven't done full output validation yet on the functions above.
- how the variable is returned from process_col does have minor impacts on speed. As expected, and part of what I was tuning here. For example, with polars if I returned numpy array I got slower time. If I returned a numpy array, but declare -> pl.series this seems about the same speed, with one or two trials being faster (then above).
after feedback/added multiprocessing
- surprise surprise, multiprocessing for the win. This seems to be regardless of pandas or polars.
Solution
With regards to Polars, using .select()
and .map_batches()
in this type of situation is kind of an "anti-pattern".
You are putting all of the data through Polars expression engine, to pass it back out to Python to run your external function, to pass it back into Polars again.
You can bypass that and simply pass each Series
directly to seasonal_decompose()
(similar to how you loop through each column in the Pandas approach):
pl.DataFrame({
col.name: seasonal_decompose(col, model="Additive", period=365).trend
for col in df_p
})
One thing I did notice though is that if you create a LazyFrame from each column and use pl.collect_all()
it speeds up the .map_batches()
approach by ~50%. (Perhaps this could be investigated.)
(Although still slightly slower than the comprehension.)
lf = df_p.lazy()
lazy_columns = [
lf.select(pl.col(col).map_batches(
lambda x: pl.Series(seasonal_decompose(x, model="Additive", period=365).trend))
)
for col in lf.columns
]
out = pl.concat(pl.collect_all(lazy_columns), how="horizontal")
Essentially the question becomes "How can I parallelize a Python for loop?"
Which as @roganjosh pointed out is done with multiprocessing.
from multiprocessing import get_context
...
if __name__ == "__main__":
df_p = ...
with get_context("spawn").Pool() as pool:
columns = pool.map(process_column, (col for col in df_p))
Out of interest, the example runs ~50% faster for me with multiprocessing versus the regular comprehension.
But it's very task/data/platform-specific so you would have benchmark it locally.
Answered By - jqurious
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.