Issue
Currently, I am using asyncio.wait_for
to poll websocket.recv
so that I can also call websocket.send
when I need to:
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
# Is there a better way to recv from a websocket?
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
# I need to do other asyncio here
await some_other_work()
except asyncio.TimeoutError:
pass
All of the documentation for asyncio
and websockets
that I could find only used toy examples. Is there a better way to do this?
This is a stand-alone program that simulates what I am doing:
import asyncio
import websockets
import random
import multiprocessing
import time
PORT = 49152
text = """\
This is a bunch of text that will be used to
simulate a server sending multiple lines of
text to a client with a random amount of delay
between each line.
"""
def get_next_line():
text_list = text.split('\n')
while True:
for line in text_list:
yield line + '\n'
line_generator = get_next_line()
async def delay_server(websocket, path):
while True:
await asyncio.sleep(random.random() * 5.0)
line = next(line_generator)
await websocket.send(line)
def server_func():
try:
start_server = websockets.serve(delay_server, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
pass
async def some_other_work():
print("I occasionally need to call websocket.send() here")
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
await some_other_work()
except asyncio.TimeoutError:
pass
def client_func():
try:
asyncio.run(client_reader())
except KeyboardInterrupt:
pass
server_proc = multiprocessing.Process(target=server_func)
server_proc.daemon = True
server_proc.start()
client_proc = multiprocessing.Process(target=client_func)
client_proc.daemon = True
client_proc.start()
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
pass
server_proc.join()
client_proc.join()
Solution
I changed the code to look like this:
async def client_reader(websocket, result):
try:
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
except asyncio.TimeoutError:
pass
return result
async def if_flag_send_message():
print("if a flag is set, I will call websocket.send here")
async def client_writer(websocket):
await if_flag_send_message()
await asyncio.sleep(1.0)
async def client_handler():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
reader_task = asyncio.create_task(client_reader(websocket, result))
writer_task = asyncio.create_task(client_writer(websocket))
await asyncio.gather(reader_task, writer_task)
result = reader_task.result()
I am still not sure this is the right way to do things, but it works.
Answered By - David Cullen
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.