Issue
I have a script that loops over a pandas dataframe and outputs GIS data to a geopackage based on some searches and geometry manipulation. It works when I use a for loop but with over 4k records it takes a while. Since I have it built as it's own function that returns what I need based on a row iteration I tried to run it with multiprocessing with:
import pandas as pd, bwe_mapping
from multiprocessing import Pool
#Sample dataframe
bwes = [['id', 7216],['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92], ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwedf = pd.read_csv(bwes)
geopackage = "datalocation\geopackage.gpkg"
tracklayer = "tracks"
if __name__=='__main__':
def task(item):
bwe_mapping.map_bwe(item, geopackage, tracklayer)
pool = Pool()
for index, row in bwedf.iterrows():
task(row)
with Pool() as pool:
for results in pool.imap_unordered(task, bwedf.iterrows()):
print(results)
When I run this my Task manager populates with 16 new python tasks but no sign that anything is being done. Would it be better to use numpy.array.split() to break up my pandas df into 4 or 8 smaller ones and run the for index, row in bwedf.iterrows(): for each dataframe on it's own processor? No one process needs to be done in any order; as long as I can store the outputs, which are geopanda dataframes, into a list to concatenate into geopackage layers at the end. Should I have put the for loop in the function and just passed it the whole dataframe and gis data to search?
Solution
if you are running on windows/macOS then it's going to use spawn
to create the workers, which means that any child MUST find the function it is going to execute when it imports your main script.
your code has the function definition inside your if __name__=='__main__':
so the children don't have access to it.
simply moving the function def
to before if __name__=='__main__':
will make it work.
what is happening is that each child is crashing when it tries to run a function because it never saw its definition.
minimal code to reproduce the problem:
from multiprocessing import Pool
if __name__ == '__main__':
def task(item):
print(item)
return item
pool = Pool()
with Pool() as pool:
for results in pool.imap_unordered(task, range(10)):
print(results)
and the solution is to move the function definition to before the if __name__=='__main__':
line.
Edit: now to iterate on rows in a dataframe, this simple example demonstrates how to do it, note that iterrows returns an index and a row, which is why it is unpacked.
import os
import pandas as pd
from multiprocessing import Pool
import time
# Sample dataframe
bwes = [['id', 7216], ['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92],
['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwef = pd.DataFrame(bwes)
def task(item):
time.sleep(1)
index, row = item
# print(os.getpid(), tuple(row))
return str(os.getpid()) + " " + str(tuple(row))
if __name__ == '__main__':
with Pool() as pool:
for results in pool.imap_unordered(task, bwef.iterrows()):
print(results)
the time.sleep(1)
is only there because there is only a small amount of work and one worker might grab it all, so i am forcing every worker to wait for the others, you should remove it, the result is as follows:
13228 ('id', 7216)
11376 ('item_id', 3277841)
15580 ('Date', '2019-01-04T00:00:00.000Z')
10712 ('start_lat', -56.92)
11376 ('End_lat', -59.87)
13228 ('start_lon', 45.87)
10712 ('End_lon', 44.67)
it seems like your "example" dataframe is transposed, but you just have to construct the dataframe correctly, i'd recommend you first run the code serially with iterrows, before running it across multiple cores.
obviously sending data to the workers and back from them takes time, so make sure each worker is doing a lot of computational work and not just sending it back to the parent process.
Answered By - Ahmed AEK
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.