Issue
I have a function that looks for relationships 2 levels deep.
First, the function gets a company from the database and then looks for people related to it, then parses through the people to schedule asyncio.create_task(async_func())
as such:
async def get_company_related_data(self, bno: str, uuid: str = ""):
people = []
base_company = self.get_company_by_bno(bno, get_dict_objects=False)[0]
people.extend(await base_company.get_related_people(convert_data=False))
...
task_list = []
for person in people:
task_list.append(process_for_bubble_chart(person))
results = await asyncio.gather(*task_list)
Here, the idea is to grab a company's related people first through the base_company.get_related_people()
method. Once I've gotten those people, I iterate through those people and:
1 - Set up tasks to process_for_bubble_chart()
so that they can run at the same time (there could be 20+ people and each of them could be related to multiple companies).
2 - I await ALL results (at least I think I am...) by inserting all tasks into the asyncio.gather()
function.
3 - Below you can see I do the same thing for each person.
The process_for_bubble_chart()
function:
async def process_for_bubble_chart(person: GcisCompanyInfoPerson or GcisLimitedPartnerPerson, convert_to_data: bool = True):
"""
Function that fetches related entities from the database
based on the people objects within the 'people' list.
"""
related_entities = []
try:
task_list = [
person.get_related_companies(),
person.get_related_businesses(),
person.get_related_limited_partners(),
person.get_related_factories(),
person.get_related_stockcompanies()
]
results = asyncio.gather(*task_list)
except Exception as err:
# Exception stuff
else:
for task_res in await results:
related_entities.extend(task_res)
if convert_to_data:
data = person.to_relation_data_object()
data.update({"related": related_entities})
return data
return related_entities
And the get_related_XXX()
methods look like this (more or less the same code returning different objects):
async def get_related_companies(self, exclude_bno: bool = True):
sql = """
SELECT * FROM Companies WHERE ...
"""
# SQL fetch logic
return [GcisCompanyInfo1(row) for row in query_db(sql)]
Where query_db()
is just a wrapper function for querying the database.
Before I implemented async, the full queries took too long (~20 sec.) so I looked into how to use the asyncio module to make things go quicker, but the computation time stayed about the same (if not even slightly longer!). How do I improve this?
This code runs as a FastAPI backend.
Solution
async
functions don't magically run in parallel - they only parallelize when you ultimately use await
on some operation which waits for a specific event to occur (common examples are things like socket reads or timed sleeps). For example, if you have an async query_db
function that can query the database asynchronously, then that may allow you to parallelize the operation.
In the absence of such an async operation, you may consider standard threads instead, using e.g. asyncio.get_running_loop().run_in_executor(None, process_for_bubble_chart, person)
to run a non-async function in a thread.
Answered By - nneonneo
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.