Issue
I am using a networking library who provides a wrapper for using its coroutine functions with asyncio
. When I wrote a test that randomly closes the connection (to see if my program is resilient under bad conditions), I found that it hangs indefinitely.
It seemed like a bug in the wrapper provided by the library I was using, because the program hangs waiting on the callback from either loop.add_reader()
or loop.add_writer()
, but then I could not find how to be notified when the socket is closed.
This is a minimal program that shows what is happening with my program:
import asyncio
import socket
async def kill_later(c):
await asyncio.sleep(0.1)
c.close()
async def main():
loop = asyncio.get_running_loop()
c = socket.create_connection(('www.google.com', 80))
c.setblocking(0)
ev = asyncio.Event()
loop.add_reader(c, ev.set)
# Closes the socket after 0.1 ms:
asyncio.create_task(kill_later(c))
print("waiting...")
#### ↓ THIS WAITS FOREVER ↓ ####
await ev.wait()
asyncio.run(main())
My question: how to be notified a socket is closed by the asyncio
loop?
EDIT: due to popular demand, made the socket non-blocking, but it makes no difference, because add_reader()
doesn't try to perform any IO on the socket, merely watches for when it is ready.
Solution
Your test program is flawed. A call to c.close()
doesn't emulate the socket being closed by the other end, it closes your own file descriptor and makes it inaccessible. You can think of close(fd)
as breaking the link between the number fd and the underlying OS resource. After that reading and polling fd becomes meaningless because the number no longer refers to anything. As a result, epoll()
can't and won't report a closed file descriptor as "readable".
The way to test the condition you want to test is by having the other end close the connection. The easiest way to do that is by spawning another process or thread as a mock server. For example:
import asyncio, threading, socket, time
def start_mock_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 10000))
s.listen(1)
def serve():
conn, addr = s.accept()
time.sleep(1)
conn.close()
s.close()
threading.Thread(target=serve).start()
async def main():
loop = asyncio.get_running_loop()
start_mock_server()
c = socket.create_connection(('localhost', 10000))
c.setblocking(0)
ev = asyncio.Event()
loop.add_reader(c.fileno(), ev.set)
print("waiting...")
await ev.wait()
print("done")
asyncio.run(main())
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.