Issue
I am trying to read a big file and export batches of CSV files using Asyncio. I know Asyncio does not support async IO for the same file, so I am trying to export to individual files per task giving the batch number. But it only runs synchronously..
I have main.py and it has a function def start()
def start():
asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))
I have processing.py and has a function test_async()
async def test_async(dictRunData):
num_logical_cpus = multiprocessing.cpu_count()
with open(dictRunData['input_file'], 'r') as infile:
content = infile.read().replace('\n', '')
lstcontent = ast.literal_eval(content)
tasks = []
chunkNum = 0
chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
print(f"number of chunks: {len(chunk_contents)}")
for chunk in chunk_contents:
chunkNum += 1
task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
tasks.append(task)
result = await asyncio.gather(*tasks, return_exceptions=True)
Here is the function process the given chunk.
async def process_chunk_async(chunk, chunkNum, dictRunData):
dict_results = {}
for data in chunk:
..do something..
dict_results.append(data)
outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)
Here is the write_chunk_async
async def write_chunk_async(dict_results, chunkNum, dictRunData):
fileName = f"_{chunkNum}.csv"
wrtieFileTo = open(fileName,"a+")
for data in dict_results.keys():
wrtieFileTo.write(data + "\n")
wrtieFileTo.close()
print(f"Done write_chunk_async file: {fileName}")
Solution
asyncio
only provides concurrency if you are using its APIs to do asynchronous I/O. In your sample code, all of your I/O (reading/writing files) is done using synchronous, blocking APIs, so using asyncio
doesn't add any value. Now, asyncio
actually doesn't provide any APIs for asynchronous reading/writing of files, because it is not well-supported at the Operating System level. See this explanation from the Python wiki.
There is a third-party library, aiofiles
, which provides an asyncio
-friendly API for file I/O, but it's just delegating all the work to background threads under the covers, so there's really no reason to use it you're not trying to integrate file I/O into an application already using asyncio
. If all your application does is read/write files, just use threads directly. Keep in mind though, that if all your threads are reading/writing files to the same disk, multithreading may not help much, either, since ultimately all threads will contend with each other trying to access the single disk.
Answered By - dano
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.