Issue
In the following code (CPython 3.9), I have an async client that's listening for messages. After receiving each message, if certain conditions are satisfied, it calls a blocking, CPU-intensive function gen_block
.
Question 1: in the below example, gen_block
blocks the loop (i.e. preventing further messages from being read by read_msg
), despite being called from run_in_executor()
... what am I missing here?
Question 2: once gen_block
is called, I don't want it to be called again until the original call is completed. What's the correct way to handle that state in the asyncio
framework?
async def main(args):
reader, writer = await asyncio.open_connection(args.host, args.port
try:
while data := await read_msg(reader):
handle_data(data, writer)
if len(TXN_QUEUE) >= 3:
await loop.run_in_executor(None, gen_block, writer)
# except, finally, shutdown etc
Pointers to references / SO posts would be fine too; I couldn't find a relevant answer for this specific scenario of calling run_in_executor
from within a loop.
Solution
When you say await loop.run_in_executor(None, gen_block, writer)
the await
means wait until loop_in_executor
finishes its CPU heavy task and therefore is not any different than calling gen_block
directly in the loop, it will block until it finishes. You can remove the await
keyword and keep track of tasks in a list like so which should stop things from blocking, since this is already in a separate task:
async def main(args):
tasks = []
reader, writer = await asyncio.open_connection(args.host, args.port
try:
while data := await read_msg(reader):
handle_data(data, writer)
if len(TXN_QUEUE) >= 3:
tasks.append(loop.run_in_executor(None, gen_block, writer))
# do something with the tasks list later, like asyncio.gather
However, given your requirement around waiting for gen_block
to finish before it is called a second time, a queue could be a good approach. When you're ready to do CPU intensive work, you put the needed information to do it into a queue. Then, you have one worker task run your CPU intensive work which reads work to do from the queue and runs it in a separate thread. Since there is only one worker you'll only run one call to your CPU intensive work at a time, while other requests queue up behind it.
Here is a simple example of this to illustrate the idea. We create a server that on localhost that accepts simple text messages and then puts them into a queue. Then, we have one worker pull from the queue and run CPU intensive work in a separate thread. Note I use asyncio.to_thread
here which was introduced in 3.9 and abstracts away managing the executor for you.
import asyncio
import functools
from asyncio import Queue
def cpu_intensive(data: str):
print(f"Running intense work for {data}")
i = 0
while i < 100000000:
i = i + 1
print(f"Finished intense work for {data}")
async def worker(queue: Queue):
while True:
work_item = await queue.get()
await asyncio.to_thread(cpu_intensive, work_item)
async def handle_connect(queue: Queue, reader, writer):
while data := await reader.readline():
print(f"Queueing {data}")
queue.put_nowait(data)
async def main():
queue = Queue()
asyncio.create_task(worker(queue))
server = await asyncio.start_server(functools.partial(handle_connect, queue), '127.0.0.1', 8000)
async with server:
await server.serve_forever()
asyncio.run(main())
Running this and sending a, b, c as three messages to your server one after another, you'll see output similar to the following:
Queueing b'a\r\n'
Running intense work for b'a\r\n'
Queueing b'b\r\n'
Queueing b'c\r\n'
Finished intense work for b'a\r\n'
Running intense work for b'b\r\n'
Finished intense work for b'b\r\n'
Running intense work for b'c\r\n'
Answered By - Matt Fowler
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.