Issue
I want to write a wrapper for calling CPU-demanding functions in asyncio.
I want it to be used like this:
@cpu_bound
def fact(x: int):
res: int = 1
while x != 1:
res *= x
x -= 1
return res
async def foo(x: int):
res = await fact(x)
...
At first, I wrote:
def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
executor = get_executor() # This is a part where I implemented myself.
return await loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
return wrapper
However, I had issues with pickling.
Traceback (most recent call last): File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\queues.py", line 245, in _feed obj = _ForkingPickler.dumps(obj) File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function fact at 0x000001C2D7D40820>: it's not the same object as main.fact
Maybe the original function and the wrapped one not having the same id
is the problem?
So, is there a way to write such a wrapper?
I do know I can use loop.run_in_executor
, but having such a wrapper can help a lot.
Solution
Maybe the original function and the wrapped one not having the same id is the problem?
In a way, yes. Before the function is sent to the target process it's pickled, which fails in your case, because the func
object in the decorator's scope is different from the fact
object in your main module, after rebinding by the decorator.
Look at this and this question for some background.
Based on these answers I created an example on how what you want can be achieved.
The trick is to create a picklable "runner" function, that the target process can use to look up your orginal function from some sort of registry (e.g. a dict...) and run it. This is of course just an example. You will probably not want to create your ProcessPoolExecutor
in the decorator.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
original_functions={}
def func_runner(name, *args):
return original_functions[name](*args)
def cpu_bound(func):
original_functions[func.__name__]=func
@functools.wraps(func)
async def wrapper(*args):
with ProcessPoolExecutor(1) as pool:
res = await asyncio.get_running_loop().run_in_executor(
pool, functools.partial(func_runner, func.__name__, *args)
)
return res
return wrapper
@cpu_bound
def fact(arg):
return arg
async def foo():
res = await fact("ok")
print(res)
if __name__ == "__main__":
asyncio.run(foo())
Answered By - thisisalsomypassword
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.