Issue
I really tried to make this post not big, but could not...
I'm trying to understand python asyncio/streaming and for that I'm coding a server and a client. On each connection the server receives it creates two new coroutines, one for processing data received and other for sending data (be it a response or a status message without a request). Processing data must be a coroutine because it will access and modify internal data, but for these example it will just acknowledge and send the request back to the client. The process_coroutine prepare a response and put it on send queue so send_coroutine can send it. Code is presented bellow and I'm currently having a weird behavior. Client only sends data after I forced exit the client program with Ctrl + C, well, at least is what I concluded. In my testes I run two clients, like I said after I forced exit one client the server gets/receives the data and send it back (now just to the other client because the first was closed).
Server Code:
import asyncio
from asyncio import StreamReader, StreamWriter, Queue
from collections import deque, defaultdict
from contextlib import suppress
from typing import Deque, DefaultDict, Dict
# Dictionary storing messages to send per writer
SEND_QUEUES: DefaultDict[StreamWriter, Queue] = defaultdict(Queue)
# Dictionary storing request messages to process
RQST_QUEUES: Dict[StreamWriter, Queue] = defaultdict(Queue)
async def client(reader: StreamReader, writer: StreamWriter):
peername = writer.get_extra_info('peername')
print(f'Remote {peername} connected')
send_task = asyncio.create_task(send_client(writer, SEND_QUEUES[writer]))
process_task = asyncio.create_task(process_rqst(writer, RQST_QUEUES[writer]))
# Send a test message
await SEND_QUEUES[writer].put("Conenction Opened\r\n")
try:
while data := await reader.readline():
print(f"DATA_RCVD: {data.decode()}")
# Queue message for processing
await RQST_QUEUES[writer].put(data.decode().rstrip())
except asyncio.CancelledError:
print(f'Remote {peername} connection cancelled.')
except asyncio.IncompleteReadError:
print(f'Remote {peername} disconnected')
finally:
print(f'Remote {peername} closed')
await SEND_QUEUES[writer].put(None)
await send_task
del SEND_QUEUES[writer]
await RQST_QUEUES[writer].put(None)
await process_task
del RQST_QUEUES[writer]
async def send_client(writer: StreamWriter, queue: Queue):
while True:
try:
data = await queue.get()
except asyncio.CancelledError:
continue
if not data:
break
try:
writer.write(data.encode())
await writer.drain()
except asyncio.CancelledError:
writer.write(data.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def process_rqst(writer: StreamWriter, queue: Queue):
with suppress(asyncio.CancelledError):
while True:
print(f"PROCESS - awaiting RQST_MSG")
if not (msg := await queue.get()):
break
# Instead of processing, just acknowledge it for now and send to ever connection
for writer in SEND_QUEUES:
if not SEND_QUEUES[writer].full():
print(f"PROCESS - Add data to writer {writer.get_extra_info('peername')}\r\n")
await SEND_QUEUES[writer].put(f"ACK {msg}\r\n")
async def main(*args, **kwargs):
server = await asyncio.start_server(*args, **kwargs)
async with server:
await server.serve_forever()
try:
asyncio.run(main(client, host='', port=25000))
except KeyboardInterrupt:
print('\r\nBye!')
Client has two coroutines, one to receive data from the keyboard and send it to the server and other for receiving data from the server. Here is the code for the Client:
import argparse
import asyncio
import aioconsole
from asyncio import StreamReader, StreamWriter
async def msg_reader(reader: StreamReader):
print(f"READER - msg_reader initialized")
try:
while data := await reader.readline():
print(f"{data.decode()}\r\n> ")
print(f"READER - Connection Ended")
except asyncio.CancelledError as cerr:
print(f"\r\nREADER ERR - {cerr}")
print(f'READER - Remote connection cancelled.')
except asyncio.IncompleteReadError:
print(f'\r\nREADER - Remote disconnected')
finally:
print(f'READER - Remote closed')
async def msg_writer(writer: StreamWriter):
print(f'WRITER - msg_writer {writer.get_extra_info("sockname")} initialized')
try:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
await writer.drain()
except asyncio.CancelledError as cerr:
print(f"\r\nWRITER ERR - {cerr}")
print(f'WRITER - Remote connection cancelled.')
finally:
print(f'WRITER - Remote closed')
writer.close()
await writer.wait_closed()
async def main():
parser = argparse.ArgumentParser(description = "This is the client for the multi threaded socket server!")
parser.add_argument('--host', metavar = 'host', type = str, nargs = '?', default = "127.0.0.1")
parser.add_argument('--port', metavar = 'port', type = int, nargs = '?', default = 25000)
args = parser.parse_args()
print(f"Connecting to server: {args.host} on port: {args.port}")
reader, writer = await asyncio.open_connection(host=args.host, port=args.port)
await asyncio.gather(msg_reader(reader), msg_writer(writer))
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\r\nBye!')
And now I present the flow of actions. Server running:
python3 sof_asyncio_server.py
Remote ('127.0.0.1', 50261) connected
PROCESS - awaiting RQST_MSG
Remote ('127.0.0.1', 50263) connected
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 01teste o2
Remote ('127.0.0.1', 50263) closed
PROCESS - Add data to writer ('127.0.0.1', 50261)
PROCESS - Add data to writer ('127.0.0.1', 50263)
PROCESS - awaiting RQST_MSG
DATA_RCVD: teste 03
Remote ('127.0.0.1', 50261) closed
PROCESS - Adicionando dados ao writer ('127.0.0.1', 50261)
PROCESS - awaiting RQST_MSG
Client 1:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50263) initialized
Conenction Opened
>
> teste 01
> teste o2
> ^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
Client 2:
python3 sof_asyncio_client.py
Connecting to server: 127.0.0.1 on port: 25000
READER - msg_reader initialized
WRITER - msg_writer ('127.0.0.1', 50261) initialized
Conenction Opened
>
> teste 03
> ACK teste 01teste o2
>
^C
READER ERR -
READER - Remote connection cancelled.
READER - Remote closed
WRITER ERR -
WRITER - Remote connection cancelled.
WRITER - Remote closed
Bye!
As I said, only after I hit Crtl + C on first client that it really sent data do server and it can be seen on Server output and the response on Client 02 output. Similarly, only after I hit Ctrl + C on Client 2 it sent data and can be seen on Server output.
Why the clients don't send the data? Or could it be the server don't receives it? I think is the first option but can't figure out why...
Solution
The client sends the data immediately, but the message it sends doesn't include newline which the server expects due to its use of readline()
to read the message. That's why the server only observes the message after the EOF, at which point readline()
returns the accumulated data even if it doesn't end with or contain a newline.
The loop in the writer needs to look like this:
while True:
msg = await aioconsole.ainput("> ")
writer.write(msg.encode())
writer.write('\n')
await writer.drain()
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.