Issue
I am trying to convert this converter (XML files from S3, to JSON) into a multithreaded application so I can speed up execution of multiple files (have 985). As a given file will be about 1gb, I would like to send say 8 of these files to be parsed at one time.
Whenever I run this I get: RuntimeWarning: coroutine 'process_object' was never awaited
Here is the code at a high level:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
Solution
I have modified and simplified your code I don't know why you are combining threadpool futures with asyncio, if you want to limit the number of workers you can use Semaphores in Asyncio
Below is the code without using concurrent futures and simplified code that works as i can't reproduce above error exactly in my local
Try this:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))
Answered By - Abhishek Singh
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.