Issue
I have a small program that loads a pretty heavy CSV (over 800MB, in chunks, using pandas.read_csv
to limit memory usage) and performs a few API calls to servers "out in the wild", and finally builds a result object which is then stored in a database.
I have added caching for the network requests where possible, but even then, the code takes over 10 hours to complete. When I profile the code with PySpy, most of it is waiting for network requests.
I tried converting it to use asyncio to speed things up, and have managed to get the code to work on a small subset of the input file. However with the full file, the memory use become prohibitive.
Here is what I have tried:
import pandas as pd
import httpx
async def process_item(item, client):
# send a few requests with httpx session
# process results
await save_results_to_db(res)
async def get_items_from_csv():
# loads the heavy CSV file
for chunk in pd.read_csv(filename, ...):
for row in chunk.itertuples():
item = item_from_row(row)
yield item
async def main():
async with httpx.AsyncClient() as client:
tasks = []
for item in get_items_from_csv():
tasks.append(process_item(item, client))
await asyncio.gather(*tasks)
asyncio.run(main())
Is there a way to avoid creating the tasks
list, which becomes a very heavy object with over 1.5M items in it? The other downside of this is that no task seems to be processed until the entire file has been read, which is not ideal.
I'm using python 3.7 but can easily upgrade to 3.8 if needed.
Solution
I think what you are looking for here is not running in batches but running N
workers which concurrently pull tasks off a queue.
N = 10 # scale based on the processing power and memory you have
async def main():
async with httpx.AsyncClient() as client:
tasks = asyncio.Queue()
for item in get_items_from_csv():
tasks.put_nowait(process_item(item, client))
async def worker():
while not tasks.empty():
await tasks.get_nowait()
# for a server
# while task := await tasks.get():
# await task
await asyncio.gather(*[worker() for _ in range(N)])
I used an asyncio.Queue
but you can also just use a collections.deque
since all tasks get added to the queue prior to starting a worker. The former is especially useful when running workers that run in a long running process (e.g. a server) where items may be asynchronously queued.
Answered By - avi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.