Issue
i am doing some heavy processing that needs async methods. One of my methods returns a list of dictionaries that needs to go through heavy processing prior to adding it to another awaitable object. ie.
def cpu_bound_task_here(record):
```some complicated preprocessing of record```
return record
After the answer given below by the kind person, my code is now just stuck.
async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()
while True:
try:
records = await obj.receive()
if not records:
continue
record_count += len(records)
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[
process_record(
record,
pipe,
loop,
executor,
stream_name,
handle_lob,
symbol,
)
for record in records
]
)
if not save:
continue
if record_count >= max_record_count:
await pipe.execute()
print("pushed to redis")
record_count = 0
continue
async def process_record(record, pipe, loop, executor, stream_name, handle_lob, symbol):
record = await loop.run_in_executor(
executor, cpu_bound_task_here(record, stream_name, handle_lob, symbol)
)
print(record)
await pipe.xadd(stream_name, record)
So what the above function does, is its streaming values asynchronously and does some heavy processing prior to pushing to redis indefinitely. I made the necessary changes and now im code is stuck.
Solution
As that output tells you, run_in_executor
returns a Future
. You need to await it to get its result.
record = await loop.run_in_executor(
None, something_cpu_bound_task_here, record
)
Note that any arguments to something_cpu_bound_task_here
need to be passed to run_in_executor
.
Additionally, as you've mentioned that this is a CPU-bound task, you'll want to make sure you're using a concurrent.futures.ProcessPoolExecutor
. Unless you've called loop.set_default_executor
somewhere, the default is an instance of ThreadPoolExecutor
.
with ProcessPoolExecutor() as executor:
for record in records:
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
Finally, your while loop is effectively running synchronously. You need to wait for the future and then for obj.add
before moving on to process the next item in records
. You might want to restructure your code a bit and use something like gather
to allow for some concurrency.
async def process_record(record, obj, loop, executor):
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
await obj.add(record)
async def fun():
loop = asyncio.get_running_loop()
records = await receive()
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[process_record(record, obj, loop, executor) for record in records]
)
I'm not sure how to handle obj
since that isn't defined in your example, but I'm sure you can figure that out.
Answered By - dirn
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.