Issue
A bit new to Python, not sure if this question is too naive. Trying to grasp the concurrency model.
Third party function (from the library) connects to multiple hosts through ssh and perform some bash command. This function returns AsyncGenerator.
Then my code iterates through this AsyncGenerator using async for
:
# some code before
asyncgenerator_var = # ...
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
continue
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
# some code after
And then code calls await on this function. But if runs one after another. Not concurrently.
Code someone explain why is that? And what should be done to make it run concurrently.
Solution
In async model, no functions are run simultaneously. The event loop may switch functions if the current function is await
-ing other functions / futures.
The async for
statement essentially means the event loop may run other scheduled callbacks/tasks between iterations.
The async for
body still run in the order yielded by the async generator.
To run the body in arbitrary order, wrap it inside an async function. Create a separate task for each input, and finally gather
the results of all tasks.
# some code before
asyncgenerator_var = # ...
async def task(host, exit_code, stdout, stderr):
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
return
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
tasks = []
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
tasks.append(asyncio.create_task(task, host, exit_code, stdout, stderr))
await asyncio.gather(*tasks)
# some code after
Answered By - Aaron
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.