Issue
So apologies because i've seen this question asked a bunch, but having looked through all of the questions none seem to fix my problem. My code looks like this
TDSession = TDClient()
TDSession.grab_refresh_token()
q = queue.Queue(10)
asyncio.run(listener.startStreaming(TDSession, q))
while True:
message = q.get()
print('oh shoot!')
print(message)
orderEntry.placeOrder(TDSession=TDSession)
I have tried doing asyncio.create_task(listener.startStreaming(TDSession,q))
, the problem is I get
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'startStreaming' was never awaited
which confused me because this seemed to work here Can an asyncio event loop run in the background without suspending the Python interpreter? which is what i'm trying to do.
with the listener.startStreaming func looking like this
async def startStreaming(TDSession, q):
streamingClient = TDSession.create_streaming_session()
streamingClient.account_activity()
await streamingClient.build_pipeline()
while True:
message = await streamingClient.start_pipeline()
message = parseMessage(message)
if message != None:
print('putting message into q')
print( dict(message) )
q.put(message)
Is there a way to make this work where I can run the listener in the background?
EDIT: I've tried this as well, but it only runs the consumer
function, instead of running both at the same time
TDSession.grab_refresh_token()
q = queue.Queue(10)
loop = asyncio.get_event_loop()
loop.create_task(listener.startStreaming(TDSession, q))
loop.create_task(consumer(TDSession, q))
loop.run_forever()
Solution
As you found out, the asyncio.run
function runs the given coroutine until it is complete. In other words, it waits for the coroutine returned by listener.startStreaming
to finish before proceeding to the next line.
Using asyncio.create_task
, on the other hand, requires the caller to be already running inside an asyncio loop already. From the docs:
The task is executed in the loop returned by get_running_loop(), RuntimeError is raised if there is no running loop in current thread.
What you need is to combine the two, by creating a function that's async
, and then call create_task
inside that async function.
For example:
async def main():
TDSession = TDClient()
TDSession.grab_refresh_token()
q = asyncio.Queue(10)
streaming_task = asyncio.create_task(listener.startStreaming(TDSession, q))
while True:
message = await q.get()
print('oh shoot!')
print(message)
orderEntry.placeOrder(TDSession=TDSession)
await streaming_task # If you want to wait for `startStreaming` to complete after the while loop
if __name__ == '__main__':
asyncio.run(main())
Edit: From your comment I realized you want to use the producer-consumer pattern, so I also updated the example above to use asyncio.Queue instead of a queue.Queue, in order for the thread to be able to jump between the producer (startStreaming
) and the consumer (the while loop)
Answered By - Maurice Lam
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.