Issue
I have a "listener" loop that constantly watches for items to process from an asyncio queue. This loop runs in a part of the application that is not using asyncio, so I've been trying to set up a passive asyncio main loop that the listener can be transferred to as needed. The listener is started and stopped as needed per input from the user.
For some reason the code below never results in the listener()
actually running (i.e. print("Listener Running")
is never printed). start_IOLoop_thread
is run at startup of the application.
Can anyone point out what the problem is with this setup? Please let me know if more info is needed.
Edit: replaced code with a runnable example per the comments:
import asyncio
import threading
from asyncio.queues import Queue
import time
class Client:
def __init__(self):
self.streamQ = Queue()
self.loop = None
self.start_IOLoop_thread()
self.stream_listener()
def stream_listener(self):
self.streaming = True
async def listener():
print("Listener Running")
while self.streaming:
data = await self.streamQ.get()
# DEBUG
print(data)
print("Listener Stopped")
print("Starting Listener")
self.listener = asyncio.run_coroutine_threadsafe(listener(), self.loop)
def start_IOLoop_thread(self):
async def inf_loop():
# Keep the main thread alive and doing nothing
# so we can freely give it tasks as needed
while True:
await asyncio.sleep(1)
async def main():
await inf_loop()
def start_IO():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
asyncio.run(main())
print("Main Exited")
threading.Thread(target=start_IO, daemon=True).start()
# A small delay is needed to give the loop time to initialize,
# otherwise self.loop is passed as "None"
time.sleep(0.1)
if __name__ == "__main__":
C = Client()
input("Enter to exit")
Solution
You never start the newly created loop. I adjusted to call main (although here it does nothing I assume the original code is more complex). All changes are in start_IO
. Tested with python 3.10 (I think there was some change in the past regarding threads and async)
import asyncio
import threading
from asyncio.queues import Queue
import time
class Client:
def __init__(self):
self.streamQ = Queue()
self.loop = None
self.start_IOLoop_thread()
self.stream_listener()
def stream_listener(self):
self.streaming = True
async def listener():
print("Listener Running")
while self.streaming:
data = await self.streamQ.get()
# DEBUG
print(data)
print("Listener Stopped")
print("Starting Listener")
self.listener = asyncio.run_coroutine_threadsafe(listener(), self.loop)
def start_IOLoop_thread(self):
async def inf_loop():
# Keep the main thread alive and doing nothing
# so we can freely give it tasks as needed
while True:
await asyncio.sleep(1)
async def main():
await inf_loop()
def start_IO():
self.loop = asyncio.new_event_loop()
self.loop.create_task(main())
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
print("Main Exited")
threading.Thread(target=start_IO, daemon=True).start()
# A small delay is needed to give the loop time to initialize,
# otherwise self.loop is passed as "None"
time.sleep(0.1)
if __name__ == "__main__":
C = Client()
input("Enter to exit")
Answered By - vladmihaisima
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.