Issue
Objective
My objective is to consume an audio stream. Logically, this is my objective:
- Audio stream comes through WebSocket A (
FastAPI
endpoint) - Audio stream is bridged to a different WebSocket, B, which will return a JSON (Rev-ai's WebSocket)
- Json results are sent back through WebSocket A, in real-time. Thus, while the audio stream is still coming in.
Possible solution
To solve this problem, I've had quite a few ideas, but ultimately I've been trying to bridge WebSocket A
to WebSocket B
. My attempt so far involves a ConnectionManager
class, which contains a Queue.queue
. The chunks of the audio stream are added to this queue so that we do not consume directly from WebSocket A
.
The ConnectionManager
also contains a generator method to yield all values from the queue.
My FastAPI implementation consumes from websocket A
like this:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
Concurrent to this ingestion, I'd like to have a task that would bridge our audio stream to WebSocket B
, and send the obtained values to WebSocket A
. The audio stream could be consumed through the aforementioned generator
method.
The generator method is necessary due to how WebSocket B consumes messages, as shown in Rev-ai's examples:
streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
This is one of the biggest challenges, as we need to be consuming data into a generator and getting the results in real-time.
Latest attempts
I've been trying my luck with asyncio
; from what i'm understanding, a possibility would be to create a coroutine that would run in the background. I've been unsuccessful with this, but it sounded promising.
I've thought about triggering this through the FastAPI
startup event, but I'm having trouble achieving concurrency. I tried to use event_loops
, but it gave me a nested event loop
related error.
Caveat
FastAPI can be optional if your insight deems so, and in a way so is WebSocket A. At the end of the day, the ultimate objective is to receive an audio stream through our own API endpoint, run it through Rev.ai's WebSocket, do some extra processing, and send the results back.
Solution
Bridge for websocket <-> websocket
Below is a simple example of webscoket proxy, where websocket A
and websocket B
are both endpoints in the FastAPI app, but websocket B
can be located in something else, just change its address ws_b_uri
. For websocket client websockets library is used.
To perform data forwarding, the code of A
endpoint starts two tasks forward
and reverse
and waits for their completion by means of asyncio.gather()
. Data transfer for both directions occurs in a parallel manner.
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
Update (Bridge websocket <-> sync generator)
Considering the last update of the question, the issue is that WebSocket B
is hidden behind a synchronous generator (in fact there are two of them, one for the input and the other for the output) and in fact, the task turns into how to make a bridge between the WebSocket and the synchronous generator. And since I never worked with the rev-ai
library, I made a stub function stream_client_start
for streamclient.start
that takes a generator (MEDIA_GENERATOR
in original) and returns a generator (response_generator
in original).
In this case, I start the processing of generators in a separate thread through the run_in_executor
, and in order not to reinvent the wheel, for communication I use a queue from the janus
library, which allows you to bind synchronous and asynchronous code through a queue. Accordingly, there are also two queues, one for A -> B
, the other for B -> A
.
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)
Answered By - alex_noname
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.