Issue
I am trying to use asyncssh
and asyncio.gather
to execute multiple concurrent ssh commands. For example:
import asyncio, asyncssh, sys
async def run(ssh, cmd):
return (await ssh.run(cmd, check=True)).stdout
async def main():
host = sys.argv[1]
ssh = await asyncssh.connect(host)
nproc = int(await run(ssh, 'nproc'))
cmds = [run(ssh, 'cpufreq-info -c{} -p'.format(core)) for core in range(nproc)]
for ret in await asyncio.gather(*cmds):
print(ret)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
If I force the value of nproc
to a small number (<10) the program works correctly, but with the real value on my machine (12), I get the following error:
Traceback (most recent call last):
File "./mwe.py", line 17, in <module>
asyncio.get_event_loop().run_until_complete(main())
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "./mwe.py", line 13, in main
for ret in await asyncio.gather(*cmds):
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "./mwe.py", line 5, in run
return (await ssh.run(cmd, check=True)).stdout
File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 3103, in run
process = await self.create_process(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 3009, in create_process
*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/asyncssh/connection.py", line 2927, in create_session
bool(self._agent_forward_path))
File "/usr/local/lib/python3.5/dist-packages/asyncssh/channel.py", line 1012, in create
packet = await self._open(b'session')
File "/usr/local/lib/python3.5/dist-packages/asyncssh/channel.py", line 633, in _open
return await self._open_waiter
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
asyncssh.misc.ChannelOpenError: open failed
Is there a limitation to the number of concurrent commands? If so, how do I find it and how do I control it with asyncio?
Solution
Based on Mikhail's advice and answers on the project issue it seems that the cause is the maximum number of sessions allowed by the ssh server. As I am using an OpenSSH server, my workaround tries to read the maximum number of sessions with a basic parser of /etc/ssh/sshd_config
looking for the MaxSessions
setting. Using this it creates a semaphore that limits the number of outstanding run
calls to prevent the program from reaching the server's limit.
#!/usr/bin/python3
import asyncio, asyncssh, sys
async def run(ssh, sem, cmd):
async with sem:
return (await ssh.run(cmd, check=True)).stdout
async def main():
host = sys.argv[1]
ssh = await asyncssh.connect(host)
max_sessions = (await ssh.run(r'sed -n "s/^MaxSessions\s*\([[:digit:]]*\)/\1/p" ' \
'/etc/ssh/sshd_config', check=True)).stdout
max_sessions = max_sessions or 10
print('MaxSessions {}'.format(max_sessions))
sem = asyncio.Semaphore(max_sessions)
nproc = int(await run(ssh, sem, 'nproc'))
cmds = [run(ssh, sem, 'cpufreq-info -c{} -p'.format(core)) for core in range(nproc)]
for ret in await asyncio.gather(*cmds):
print(ret)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
Answered By - haggai_e
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.