Issue
I am having trouble recovering from a OpenSSL.SSL.WantReadError
when using asyncio.StreamReader
to do async TCP requests. Here is my setup and attempt to fix the problem.
I create a reader and writer using the following call to wrap a previously created socket.
self.reader, self.writer = await asyncio.open_connection(sock=self.sock)
In this case it is an TLS Socket.
I originally had some code that looked like this to read from the socket.
async def _receive_data(reader: asyncio.StreamReader, sz):
pos = 0
data = None
while pos < sz:
chunk = await reader.read(sz - pos)
if pos == 0:
data = chunk
else:
data += chunk
pos += len(chunk)
return data
I started receiving a OpenSSL.SSL.WantReadError
when trying to read larger responses from the server (~300KB). I assume it also takes the server some time to formulate this particular response so the error makes sense. The problem I am having is, how do I recover from this?
I tried changing my _receive_data
function to
async def _receive_data(reader: asyncio.StreamReader, sz):
pos = 0
data = b""
while pos < sz:
try:
chunk = await reader.read(sz - pos)
except SSL.WantReadError:
print(
f"SSL.WantReadError: Waiting for more data. Received {pos} bytes of {sz} bytes."
)
print("reader._exception: ", repr(reader))
reader.set_exception(None)
await asyncio.sleep(0.1)
continue
if pos == 0:
data = chunk
else:
data += chunk
pos += len(chunk)
return data
I get some output from the print statements that looks like this
SSL.WantReadError: Waiting for more data. Received 0 bytes of 8 bytes.
reader._exception: <StreamReader exception=WantReadError() transport=<_SelectorSocketTransport closed fd=15>>
SSL.WantReadError: Waiting for more data. Received 163832 bytes of 303525 bytes.
reader._exception: <StreamReader exception=WantReadError() transport=<_SelectorSocketTransport closed fd=17>>
It only prints the output twice then hangs. I am assuming this is because the socket is set to None on the reader object as indicated by <_SelectorSocketTransport closed fd=17>>
.
The documentation for WantReadError
states
The operation did not complete; the same I/O method should be called again later, with the same arguments. Any I/O method can lead to this since new handshakes can occur at any time.
The wanted read is for dirty data sent over the network, not the clean data inside the tunnel. For a socket based SSL connection, read means data coming at us over the network. Until that read succeeds, the attempted OpenSSL.SSL.Connection.recv(), OpenSSL.SSL.Connection.send(), or OpenSSL.SSL.Connection.do_handshake() is prevented or incomplete. You probably want to select() on the socket before trying again.
I am unsure how to apply that information to the case of asyncio.Streams. Ideally, I would be able to make the same exact request on the wrapped socket but that is abstracted away with Streams. Maybe the solution is to somehow store the failed request i.e sz - pos
, create a new reader/writer using the same socket, and make the sz-pos
request again? This all feels too messy and I want to assume there is a cleaner way to handle this case
Edit:
I tried the method I mentioned above in my last paragraph but that did not work. If I try to recreate the streams with the same sock I get [Errno 9] Bad file descriptor
when I call asyncio.open_connection()`. The only course of action I see now is to use a different socket but that means I'll need to request the same information until I get a success. Seeing as it takes the server some time to formulate the response this is not ideal
Solution
I ended up figuring out the reason I encountered an untypical issue with TLS + asyncio. I also was able to find a least-disruptive non-ideal solution that worked for my case.
The problem is that asyncio does not work out of the box with pyOpenSSL module's OpenSSL.SSL.Context. asyncio is intended to be used with the built-in ssl modules ssl.SSLContext. The similar context object names were throwing me off. I imagine that if I had used ssl.SSLContext the ssl.SSLWantReadError (as opposed to OpenSSL.SSL.WantReadError :)) would have been handled properly although I did not find anywhere in asyncio that handled this error explicitly. I did however see that asyncio handles the exception BlockingIOError appropriately so I decided to sub-class my socket and return the appropriate error instead.
class _AsyncioSSLConnectionAdapter(SSL.Connection):
def recv(self, bufsiz: int, flags: int | None = None) -> bytes:
"""Custom wrapper around SSL.Connection.recv that raises a BlockingIOError instead of SSL.WantReadError
since pyOpenSSL.SSL.Connection (basically a socket) is not compatible with asyncio streams and raises SSL.WantReadError
in cases when no data is available to read. If we raise SSL.WantReadError asyncio will close the underlying socket
and not allow retries. By raising BlockingIOError asyncio will retry the read operation.
"""
try:
ssl.SSLWantReadError
return super().recv(bufsiz, flags)
except SSL.WantReadError:
raise BlockingIOError("Wrapped SSL.WantReadError")
This is the least invasive solution I could find without replacing my use of OpenSSL.SSL.Context with ssl.SSLContext which would not work for my use case because of the added features of pyOpenSSL (imported as OpenSSL
).
Another solution I found was to create my own Transport
class that uses OpenSSL.SSL.Context
, but this also would have been a bit more work to implement properly. There is a project called aioopenssl that does this, but it does not seem mature and actively maintained enough for me to introduce it as a new dependency.
Answered By - JDogMcSteezy
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.