Issue
I want to use the PyAudio
library in an async context, but the main entry point for the library only has a callback-based API:
import pyaudio
def callback(in_data, frame_count, time_info, status):
# Do something with data
pa = pyaudio.PyAudio()
self.stream = self.pa.open(
stream_callback=callback
)
How I'm hoping to use it is something like this:
pa = SOME_ASYNC_COROUTINE()
async def listen():
async for block in pa:
# Do something with block
The problem is, I'm not sure how to convert this callback syntax to a future that completes when the callback fires. In JavaScript I would use promise.promisify()
, but Python doesn't seem to have anything like that.
Solution
An equivalent of promisify
wouldn't work for this use case for two reasons:
- PyAudio's async API doesn't use the asyncio event loop - the documentation specifies that the callback is invoked from a background thread. This requires precautions to correctly communicate with asyncio.
- The callback cannot be modeled by a single future because it is invoked multiple times, whereas a future can only have one result. Instead, it must be converted to an async iterator, just as shown in your sample code.
Here is one possible implementation:
def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(*args):
loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
return get(), put
make_iter
returns a pair of <async iterator, put-callback>. The returned objects hold the property that invoking the callback causes the iterator to produce its next value (the arguments passed to the callback). The callback may be called from an arbitrary thread and is thus safe to pass to pyaudio.open
. The async iterator should be used with async for
in an asyncio coroutine, which will correctly suspend while waiting for the callback to supply the next value:
async def main():
stream_get, stream_put = make_iter()
stream = pa.open(stream_callback=stream_put)
stream.start_stream()
async for in_data, frame_count, time_info, status in stream_get:
# ...
asyncio.get_event_loop().run_until_complete(main())
Note that, according to the documentation, the callback must also return a meaningful value, a tuple of frames and a Boolean flag. This can be incorporated in the design by changing the fill
function to also receive the data from the asyncio side. The implementation is not included because it might not make much sense without an understanding of the domain.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.