Issue
I have a y.csv
file. The file size is 10 MB and it contains data from Jan 2020 to May 2020
.
I also have a separate file for each month. e.g. data-2020-01.csv
. It contains detailed data. The file size of each month file is around 1 GB
.
I'm splitting the y.csv
by month and then process the data by loading the relevant month file. This process is taking too long when I go for large number of months. e.g. 24 months.
I would like to process the data faster. I have access to AWS m6i.8xlarge
instance which has 32 vCPU
and 128 GB
memory.
I'm new to multiprocessing. So can someone guide me here?
This is my current code.
import pandas as pd
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
def process(_month_df, _index):
idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
for _, value in _month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
y.loc[_index, "result"] = 1
return
if value < down_value:
y.loc[_index, "result"] = 0
return
for x in periods:
filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2) # data-2020-01
filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])] # Only get the current month records
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
for index, row in filtered_y.iterrows():
process(month_df, index)
Solution
A multithreading pool would be ideal for sharing the y
dataframe among threads (obviating the need for using shared memory) but is not so good at running the more CPU-intensive processing in parallel. A multiprocessing pool is great for doing CPU-intensive processing but not so great in sharing data across processes without coming up with a shred memory representation of your y
dataframe.
Here I have rearranged your code so that I use a multithreading pool to create filtered_y
for each period (which is a CPU-intensive operation, but pandas does release the Global Interpreter Lock for certain operations -- hopefully this one). Then we are only passing one-months worth of data to a multiprocessing pool, rather than the entire y
dataframe, to process that month with worker function process_month
. But since each pool process does not have access to the y
dataframe, it just returns the indices that need to be updated with the values to be replaced.
import pandas as pd
from multiprocessing.pool import Pool, ThreadPool
def process_month(period, filtered_y):
"""
returns a list of tuples consisting of (index, value) pairs
"""
filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2) # data-2020-01
month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True) # Filesize: ~1 GB (data-2020-01.csv)
results = []
for index, row in filtered_y.iterrows():
idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
for _, value in month_df.loc[idx:].itertuples():
up_delta = 200
down_delta = 200
up_value = value + up_delta
down_value = value - down_delta
if value > up_value:
results.append((index, 1))
break
if value < down_value:
results.append((index, 0))
break
return results
def process(period):
filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])] # Only get the current month records
for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
y.loc[index, "result"] = value
def main():
global y, multiprocessing_pool
periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0) # Filesize: ~10 MB
with Pool() as multiprocessing_pool, ThreadPool(len(periods)) as thread_pool:
thread_pool.map(process, periods)
# Presumably y gets written out again as a CSV file here?
# Required for Windows:
if __name__ == '__main__':
main()
Answered By - Booboo
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.