Issue
I'm implementing a producer-consumer program in a server implementation using sockets and asyncio. The problem is the async function sock_recv() does not seem to be working properly when used with a socket wrapped in an ssl connection. Following is the working code.
Server side
import asyncio
import random
import socket
import ssl
SERVER_ADDRESS = (HOST, PORT) = "127.0.0.1", 8881
async def producer(queue, client_connection, event_loop):
while True:
print("Waiting for sock_recv")
await event_loop.sock_recv(client_connection, 4096)
r = random.randint(1,101)
print("Produced: %d" % r)
await queue.put(r)
await asyncio.sleep(0)
async def consumer(queue):
while True:
print("Wating for queue.get()")
r = await queue.get()
await asyncio.sleep(2)
print("Consumed: %d" % r)
async def main():
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_socket.bind(SERVER_ADDRESS)
listen_socket.listen(5)
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile="certificate.pem", keyfile="key.pem")
client_connection, client_address = listen_socket.accept()
# client_connection = ssl_context.wrap_socket(
# client_connection, server_side=True
# )
client_connection.setblocking(False)
queue = asyncio.Queue()
t1 = asyncio.create_task(producer(queue, client_connection, asyncio.get_event_loop()))
t2 = asyncio.create_task(consumer(queue))
await asyncio.wait([t1, t2])
event_loop = asyncio.get_event_loop()
asyncio.run(main())
Client side
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 8881))
s.sendall(b"Hello")
Output
Waiting for sock_recv
Waiting for queue.get()
Produced: 49
Waiting for sock_recv
Consumed: 49
Waiting for queue.get()
Here's the problem, When I uncomment the following part
# client_connection = ssl_context.wrap_socket(
# client_connection, server_side=True
# )
It blocks on the sock_recv() function. With the uncommented code, I get the following output:
Output
Waiting for sock_recv
Waiting for queue.get()
Client Code
import socket
import ssl
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = ssl.wrap_socket(s)
sock.connect(("127.0.0.1", 8881))
sock.sendall(b"Hello")
Finally, when I shutdown the server with ctrl-c. I get the following output
^CTask exception was never retrieved
future: <Task finished coro=<producer() done, defined at asyncio_test.py:8> exception=SSLWantReadError(2, 'The operation did not complete (read) (_ssl.c:2488)')>
Traceback (most recent call last):
File "asyncio_test.py", line 11, in producer
await event_loop.sock_recv(client_connection, 4096)
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/selector_events.py", line 352, in sock_recv
return await fut
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/selector_events.py", line 366, in _sock_recv
data = sock.recv(n)
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/ssl.py", line 1037, in recv
return self.read(buflen)
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/ssl.py", line 913, in read
return self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2488)
Traceback (most recent call last):
File "asyncio_test.py", line 42, in <module>
asyncio.run(main())
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
self.run_forever()
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
self._run_once()
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Edit: I just found out that it works if I pass do_handshake_on_connect=False in the wrap_socket() function in the client code, but then ssl won't work.
Solution
So, it turns out that the async function sock_recv() does not support SSLSocket, since SSL needs to write to user-space buffer as described in this SO answer.
The way to work around this issue is to use the transports or streams in asyncio. Here, is the working version of the code in the above question.
import asyncio
import random
import socket
import ssl
SERVER_ADDRESS = (HOST, PORT) = "127.0.0.1", 8881
async def producer(reader, writer, queue):
while True:
print("Waiting for sock_recv")
await reader.read(16)
r = random.randint(1,101)
print("Produced: %d" % r)
await queue.put(r)
await asyncio.sleep(0)
async def consumer(queue):
while True:
print("Wating for queue.get()")
r = await queue.get()
await asyncio.sleep(2)
print("Consumed: %d" % r)
async def set_up_producer_consumer(reader, writer):
queue = asyncio.Queue()
t1 = asyncio.create_task(producer(reader, writer, queue))
t2 = asyncio.create_task(consumer(queue))
await asyncio.wait([t1, t2])
async def main():
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile="certificate.pem", keyfile="key.pem")
server = await asyncio.start_server(set_up_producer_consumer, HOST, PORT, family=socket.AF_INET, ssl=ssl_context, reuse_address=True)
await server.wait_closed()
event_loop = asyncio.get_event_loop()
asyncio.run(main())
Answered By - Akshay Takkar
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.