Issue
I trying to create a proof of concept with Python 3 asyncio, implementing a client that sends heartbeats periodically to a server in order to keep the connection alive.
Note that the server is simply an echo server and doesn't close the connection. But it is important that the client is able to send a heartbeat periodically.
Here is the current implementation:
stream_client.py
import asyncio
class StreamClient:
def __init__(self, heartbeat_int, loop=None):
self.heartbeat_int = heartbeat_int
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
def start(self):
""" start manages the event loop, but it is not a coroutine """
self.loop.run_until_complete(self.get_client())
self.loop.create_task(self.start_timed_session())
msg = self.loop.run_until_complete(self.logon('hello'))
if msg == 'hello':
try:
self.loop.run_forever()
except KeyboardInterrupt:
print('Closing connection with server...')
self.writer.close()
self.loop.close()
else:
print('Logon unsuccessful, closing connection with server...')
self.writer.close()
self.loop.close()
@asyncio.coroutine
def get_client(self):
self.reader, self.writer = yield from asyncio.open_connection(
'127.0.0.1',
9871,
loop=self.loop
)
print('Connection established at "localhost:9871"')
@asyncio.coroutine
def timed_session(self):
yield from asyncio.sleep(self.heartbeat_int)
self.loop.create_task(self.start_timed_session())
@asyncio.coroutine
def start_timed_session(self):
heartbeat_task = self.loop.create_task(self.timed_session())
heartbeat_task.add_done_callback(self.heartbeat)
@asyncio.coroutine
def logon(self, msg):
print('Sending message:', msg)
self.writer.write(msg.encode())
data = yield from self.reader.read(15)
resp = data.decode()
print('Data received:', resp)
return resp
def heartbeat(self, fut):
"""
This is future's callback:
1) Can't be a coroutine
2) Takes a future as an argument
"""
print('> Sending heartbeat...')
self.writer.write('heartbeat'.encode())
# Start the client
client = StreamClient(5)
client.start()
stream_server.py
import asyncio
class StreamServer:
def __init__(self, loop=None):
if loop is not None:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
@asyncio.coroutine
def server_handler(self, reader, writer):
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()
loop = asyncio.get_event_loop()
stream_server = StreamServer(loop)
coro_server = asyncio.start_server(
stream_server.server_handler,
'127.0.0.1',
9871,
loop=stream_server.loop
)
server = loop.run_until_complete(coro_server)
print('Listening on:', server.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
print('Closing server')
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Question:
On the heartbeat()
method the line self.writer.write('heartbeat'.encode())
seems to never be executed.
How can I get it to work?
Solution
The code is being executed on the client side, you just don't have any code on the server-side to receive the message. server_handler
is only called once per connection, not once per message. So, if you want it to be able to receive an infinite number of heartbeats from a given client, you need to set up a loop inside of server_handler
to receive them:
@asyncio.coroutine
def server_handler(self, reader, writer):
while True: # Keep receiving data from client.
data = yield from reader.read(15)
msg = data.decode()
if data is not 'bye':
print('Received data: "{}" from {}'.format(msg, writer.get_extra_info('peername')))
print('Echoing the message...')
writer.write(data)
yield from writer.drain()
else:
print('Received data: "{}" from {}'.format(
data,
writer.get_extra_info('peername')
)
)
print('Closing the connection...')
writer.close()
Answered By - dano
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.