Issue
I have been attempting to generate a ping scan that uses a limited number of processes. I tried as_completed without success and switched to asyncio.wait
with asyncio.FIRST_COMPLETED
.
The following complete script works if the offending line is commented out. I'd like to collect the tasks to a set in order to get rid of pending = list(pending)
however pending_set.union(task)
throws await wasn't used with future
.
"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime
pinglist = [
'127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
'192.168.177.20', '192.168.177.100', '172.17.1.1'
]
async def ping(ip):
"""Run external ping."""
p = await asyncio.create_subprocess_exec(
'ping', '-n', '-c', '1', ip,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
return await p.wait()
async def run():
"""Run the test, uses some processes and will take a while."""
iplist = pinglist[:]
pending = []
pending_set = set()
tasks = {}
while len(pending) or len(iplist):
while len(pending) < 3 and len(iplist):
ip = iplist.pop()
print(f"{asctime()} adding {ip}")
task = asyncio.create_task(ping(ip))
tasks[task] = ip
pending.append(task)
pending_set.union(task) # comment this line and no error
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
pending = list(pending)
for taskdone in done:
print(' '.join([
asctime(),
('BAD' if taskdone.result() else 'good'),
tasks[taskdone]
]))
if __name__ == '__main__':
asyncio.run(run())
Solution
There are two problems with pending_set.union(task)
:
union
doesn't update the set in-place, it returns a new set consisting of the original one and the one it receives as argument.It accepts an iterable collection (such as another set), not a single element. Thus
union
attempts to iterate overtask
, which doesn't make sense. To make things more confusing, task objects are technically iterable in order to be usable inyield from
expressions, but they detect iteration attempts in non-async contexts, and report the error you've observed.
To fix both issues, you should use the add
method instead, which operates by side effect and accepts a single element to add to the set:
pending_set.add(task)
Note that a more idiomatic way to limit concurrency in asyncio is using a Semaphore
. For example (untested):
async def run():
limit = asyncio.Semaphore(3)
async def wait_and_ping(ip):
async with limit:
print(f"{asctime()} adding {ip}")
result = await ping(ip)
print(asctime(), ip, ('BAD' if result else 'good'))
await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.