Issue
Hi I'm referencing the following question because it's similar to what I'm trying to achieve, however, I'm getting an error that I can't seem to figure out so looking for some help
Combining multithreading and multiprocessing with concurrent.futures
Here's my test code:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial
num_list = range(0,1000)
def test(x):
x**2
def multithread(f, lst):
print('Thread running')
with ThreadPoolExecutor() as thread_executor:
thread_executor.map(f, lst)
def multiprocesser(lst, f, n_processors=cpu_count()//2):
chunks = np.array_split(lst, n_processors)
with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
mp_executor.map(partial(multithread, f), chunks)
if __name__ == '__main__':
multiprocesser(num_list, test)
Process SpawnProcess-31:
Traceback (most recent call last):
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
Process SpawnProcess-32:
Traceback (most recent call last):
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
So I didn't specify number of threads (don't see a reason to for the threadpool executor). Having trouble understanding what the error actually means and how I can fix it. Any help would be appreciated.
Solution
The error probably stems from the fact that multithread() is being called incorrectly.
Try this:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial
num_list = range(0,1000)
def test(x):
x**2
def multithread(f, lst):
print('Thread running')
with ThreadPoolExecutor() as thread_executor:
thread_executor.map(f, lst)
def multiprocesser(lst, f, n_processors=cpu_count()//2):
chunks = np.array_split(lst, n_processors)
with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
mp_executor.map(partial(multithread, f), chunks)
if __name__ == '__main__':
multiprocesser(num_list, test)
Answered By - Vlad
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.