Issue
I am trying to write a simple application that connects to multiple rudimentary TCP servers with long lived connections where I send/receive data from each connection. Basically receiving events from the server, sending commands back and receiving results from the commands. Think of controlling a device, but i want to control N devices on a single thread.
I have a working non-async, non-blocking implementation, but the time.sleep() are killing the responsiveness or killing the CPU, and using select
is so much cooler.
Given the below example, i want to connect to all three servers, and await receive_message
on each one simultaneously. Currently, it's blocked in the connect
's receive_message(), so I only get this output:
Connecting to 1
Sending password to 1
Waiting for message from 1
I'd like to get something similar to this, not exactly, but showing that the connections are all independently scheduled.
Connecting to 1
Connecting to 2
Connecting to 3
Sending password to 1
Sending password to 2
Sending password to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Connected to 1
Connected to 2
Connected to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Watered down version of what i'm trying. No, the real server isn't this insecure.... this is just an example.
import asyncio
class Connection:
def __init__(self, name, host, port):
self.name = name
self.host = host
self.port = port
self.reader, self.writer = None, None
self.connected = False
async def connect(self):
print(f'Connecting to {self.name}')
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
await self.send_message('password')
response = await self.receive_message()
if response == 'SUCCESS':
self.connected = True
print(f'Connected to {self.name}')
else:
raise Exception('unsuccessful connection')
print(f'Connected to {self.name}')
async def send_message(self, message):
print(f'Sending {message} to {self.name}')
self.writer.write(f'{message}\n'.encode('utf-8'))
async def receive_message(self):
print(f'Waiting for message from {self.name}')
return (await self.reader.readline()).decode('utf-8')
connections = (
Connection(1, 'localhost', 21114),
Connection(2, 'localhost', 21115),
Connection(3, 'localhost', 21116)
)
async def run():
for connection in connections:
await connection.connect()
# how to receive messages from each connection as they are received
for connection in connections:
await connection.receive_message()
asyncio.run(run())
Solution
The await
in the for
loop is effectively serializing your connections. In asyncio parallelism happens at the level of task, so if you want the connections to run in parallel, you need to spawn several tasks (or use a function that will do it for you, such as asyncio.gather
). For example:
async def handle_connection(conn):
await conn.connect()
await conn.receive_message()
# ...
async def run():
tasks = []
for conn in connections:
tasks.append(asyncio.create_task(handle_connection(conn)))
# wait until all the tasks finish
await asyncio.gather(*tasks)
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.