Issue
Is there a way to consume messages twice from a websocket connection using two separate async for
loops?
Running the code below gives RuntimeError: cannot call recv while another coroutine is already waiting for the next message.
import websockets
import asyncio
async def foo(ws):
async for msg in ws:
print(f"foo: {msg}")
async def bar(ws):
async for msg in ws:
print(f"bar: {msg}")
async def main():
async with websockets.connect("wss://echo.websocket.org") as ws:
asyncio.create_task(foo(ws))
asyncio.create_task(bar(ws))
await ws.send("Hello")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Solution
You could create a broadcast function that reads through ws
once, and transmits each message into multiple generators. For example (untested):
def broadcast(stream, num):
# iterate over the stream just once, but put each message into
# multiple queues
queues = []
async def consume():
async for msg in stream:
for queue in queues:
await queue.put(msg)
for queue in queues:
await queue.put(None)
asyncio.create_task(consume())
# create the queues and return the generators that transmit
# their contents
async def transmit(queue):
while True:
msg = await queue.get()
if msg is None:
break
yield msg
iters = []
for _ in range(num):
queue = asyncio.Queue()
iters.append(transmit(queue))
queues.append(queue)
return iters
With that in place, you main()
could look like this:
async def main():
async with websockets.connect("wss://echo.websocket.org") as ws:
foo_stream, bar_stream = broadcast(ws, 2)
asyncio.create_task(foo(foo_stream))
asyncio.create_task(bar(bar_stream))
await ws.send("Hello")
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.