Issue
I have 2 main threads (consumer/producer) communicating via a SimpleQueue
. I want the consumers to execute as fast as something is added to the queue. I also want to avoid asyncio.Queue
since I want to keep consumer and producer decoupled and flexible for future changes.
I started looking in gevent, asyncio, etc. but it all feels very confusing to me.
from queue import SimpleQueue
from time import sleep
import threading
q = SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
def serve_forever():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = q.get()
print(f"Dequeued task {task}")
async def run_task():
print(f"Working on task {task}")
loop.create_task(run_task()) # run task
thread = threading.Thread(target=serve_forever)
thread.daemon = True
thread.start()
sleep(1)
Output:
Dequeued task 1
Dequeued task 2
Dequeued task 3
Why doesn't run_task
execute in my case?
Solution
Simply calling create_task
doesn't actually run anything; you need to have a running asyncio
event loop, which you get by calling something like asyncio.run
or loop.run_until_complete
.
You don't need to create an explicit loop
as you're doing, either; asyncio provides a default event loop.
Lastly, asyncio tasks won't run if you're never calling await
, because this is how the current task yields execution time to other tasks. So even if we fix the earlier problems, your tasks will never execute because execution will be stuck inside your while
loop. We need to be able to await
on the q.get()
calls, which isn't directly possible when using queue.SimpleQueue
.
We can solve the above -- while still using queue.SimpleQueue
-- by using the run_in_executor
method to run the non-async q.get
calls (this runs the calls in a separate thread and allows us to asynchronously wait for the result). The following code works as I think you intended:
import asyncio
import threading
import queue
q = queue.SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
async def run_task(task):
print(f"Working on task {task}")
async def serve_forever():
loop = asyncio.get_event_loop()
while True:
task = await loop.run_in_executor(None, lambda: q.get())
print(f"Dequeued task {task}")
asyncio.create_task(run_task(task)) # run task
def thread_main():
asyncio.run(serve_forever())
thread = threading.Thread(target=thread_main)
thread.daemon = True
thread.start()
# just hang around waiting for thread to exit (which it never will)
thread.join()
Output:
Dequeued task 1
Working on task 1
Dequeued task 2
Working on task 2
Dequeued task 3
Working on task 3
Answered By - larsks
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.