Issue
I want to execute tasks asynchronously and concurrently. If task1
is running when task2
arrives, task2
is started right away, without waiting for task2
to complete. Also, I would like to avoid callbacks with the help of coroutines.
Here's a concurrent solution with callbacks:
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)
class FibonacciCalculatorFuture:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
@staticmethod
def calculate(n):
print(f"started n={n}")
return fibonacci(n)
def run(self, n):
future = self.pool.submit(self.calculate, n)
future.add_done_callback(lambda f: print(f.result()))
if __name__ == '__main__':
calculator = FibonacciCalculatorFuture()
calculator.run(35)
calculator.run(32)
print("initial thread can continue its work")
Its output:
started n=35
started n=32
initial thread can continue its work
3524578
14930352
And here's my effort to get rid of callbacks:
class FibonacciCalculatorAsync:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
@staticmethod
def calculate_sync(n):
print(f"started n={n}")
return fibonacci(n)
async def calculate(self, n):
result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
print(result)
def run(self, n):
asyncio.ensure_future(self.calculate(n))
if __name__ == '__main__':
calculator = FibonacciCalculatorAsync()
calculator.run(35)
calculator.run(32)
calculator.loop.run_forever()
print("initial thread can continue its work")
Output:
started n=35
started n=32
3524578
14930352
In this case initial thread won't be able to go further than loop.run_forever()
and hence won't be able to accept new tasks.
So, here's my question: is there a way to simultaneously:
- execute tasks concurrently;
- be able to accept new tasks and schedule them for execution right away (along with already running taks);
- use coroutines and code without callbacks.
Solution
The second bullet from your question can be met by running asyncio in a dedicated thread and using asyncio.run_coroutine_threadsafe
to schedule coroutines. For example:
class FibonacciCalculatorAsync:
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
@staticmethod
def calculate_sync(n):
print(f"started n={n}")
return fibonacci(n)
async def calculate(self, n):
result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
print(result)
def run(self, n):
asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)
def start_loop(self):
thr = threading.Thread(target=self.loop.run_forever)
thr.daemon = True
thr.start()
if __name__ == '__main__':
calculator = FibonacciCalculatorAsync()
calculator.start_loop()
calculator.run(35)
calculator.run(32)
print("initial thread can continue its work")
calculator.run(10)
time.sleep(1)
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.