Issue
I am trying to load data from odbc and write to file at the same time using python and ayncio without much luck yet.
I have batchs of 200 000
rows coming from the odbc and I'd like to start writing to the filesystem as soon as one of the batch finishes reading. I'd like the write to start writting without blocking the read of the next batch but so far the writting happens at the very end instead of conccurently while reading.
async def writeToCsv_v5(list_tables_source):
# start=time.time()
connection_string='Driver=Progress OpenEdge 12.2 Driver;HOST=bla;DB=bla;PORT=9000;UID=bla;PASSWORD=bla'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
list_task=[]
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
list_task.append(asyncio.create_task(write_to_parquet_v1(df,i)))
print("Time to read "+str(1)+"batch is :"+str(read_stop-read_start))
i+=1
print("finish read")
await asyncio.gather(*list_task)
async def write_to_parquet_v1(df: pd.DataFrame,i: int):
print("Start taks : "+str(i))
df=df.applymap(str)
write_start=time.time()
df.to_parquet(path+table+"_"+str(i)+".parquet",engine='fastparquet')
write_stop=time.time()
print("Time to write to file "+str(1)+"batch is :"+str(write_stop-write_start))
asyncio.run(writeToCsv_v5(list_tables_columns))
So the outcome is the following :
Time to read 1batch is :0.1856553554534912
Time to read 1batch is :0.173567533493042
Time to read 1batch is :0.1639997959136963
Time to read 1batch is :0.1889955997467041
Time to read 1batch is :0.14117860794067383
Time to read 1batch is :0.14899969100952148
Time to read 1batch is :0.14300036430358887
Time to read 1batch is :0.1419994831085205
Time to read 1batch is :0.14099979400634766
Time to read 1batch is :0.1399984359741211
Time to read 1batch is :0.1340029239654541
Finish reading:
Start taks : 0
Time to write to file 1batch is :0.440692663192749
Start taks : 1
Time to write to file 1batch is :0.23961353302001953
Start taks : 2
Time to write to file 1batch is :0.2606239318847656
Start taks : 3
Time to write to file 1batch is :0.2226719856262207
Start taks : 4
Time to write to file 1batch is :0.21300196647644043
Start taks : 5
Time to write to file 1batch is :0.22265386581420898
Start taks : 6
Time to write to file 1batch is :0.22600030899047852
Start taks : 7
Time to write to file 1batch is :0.28110337257385254
Start taks : 8
Time to write to file 1batch is :0.21700191497802734
Start taks : 9
Time to write to file 1batch is :0.24160361289978027
Start taks : 10
Time to write to file 1batch is :0.26999974250793457
It show the write part only starts when all the read are finnished.
Edit : I used a hack await asyncio.sleep(0)
so the write process start immediatly my new code is now
async def writeToCsv_v6(list_tables_source):
# start=time.time()
connection_string='secret'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
#rows = cursor.execute('Select '+list_columns+' from PUB."'+table+'"')
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
list_tasks=[]
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
task=asyncio.create_task(write_to_parquet_v2(df,i))
await asyncio.sleep(0)
list_tasks.append(task)
print("Time to read "+str(i)+"batch is :"+str(read_stop-read_start))
i+=1
# test
# if i>10:
# break
print("finish read")
await asyncio.gather(*list_tasks)
But now both the read and the write process are synchronous. Which doesn't help me much.
Solution
Async program does not properly do "two things at the same time" like you are expecting - it is not multi-threading.
It just offers convenient ways of one program doing one thing wile it waits for another to finish - and you have always to tell when the code should pause to "awaiting" for something to happen - that is when other parts of the program can run.
That said, creating a task is not enough to put code in motion, it just schedules the code to be run - it is the call to asyncio.gather
which will actually make the asyncio loop through all schedule tasks (and gather itself will return when the tasks it got as a parameter are done, but meanwhile it will cycle through all existing asyncio tasks: those which are a parameter to it and those who are not.
So, having the code write down to a file after each batch is just a matter of moving that gather
call to inside the for-loop that iterates through batches. That will write each 200.000 rows. Or,as you found out, sprinkling some await asyncio.sleep(0)
in the middle of the code will also make the asyncio code loop through all created tasks once. But that alone can't make things run at the same time: both your read_arrow_batches_from_odbc
and write_to_parquet_v2
are synchronous functions which means: they won't delegate the execution to the asyncio event loop when they are waiting for I/O - they will simply run to completion: one batch fully gathered before writing to disk of that batch can take place, and only then the ODBC driver will fetch the next batch.
As can be seen, you could as well drop asyncio altogether from that code - that is the simpler way to get your job done - otherwise, it can be changed to work in a multi-threaded way, without many changes. But making it work properly in parallel with asyncio would require async versions of your ODBC driver, and for the parquet writer. What can be done to make it "look like asyncio" (and which a lot, maybe the majority of asyncio Python code does), is to run these tasks in other threads, and just use asyncio to coordinate the calls to fetch data. So a "fake" asyncio would be doable there - but possibly the resulting code would be more complex than just having one thread for getting data in, one to get data out, and coordinate things from the main thread.
(You'd need to write an asyncio generator to work as a wrapper to queue communication code to get the input data, for example. The wrapper is just an extra layer that is not needed)
I will re-write your code as simply multi-threaded instead. The "big" difference is each capability has to be written in a different function so that Python runs each function in a different thread. With asyncio code, from the botton-up, working asyncio libs, and so on, it is possible to make the fetching and writing datain a single function like you did: but that is not necessarily even mode readable, and most of time you will need helper functions anyway.
from threading import Thread
from queue import Queue
_sentinel = object()
def writeToCsv_v6(list_tables_source, data_queue):
# start=time.time()
connection_string='secret'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
#rows = cursor.execute('Select '+list_columns+' from PUB."'+table+'"')
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
data_queue.put((df, i))
print("Time to read "+str(i)+"batch is :"+str(read_stop-read_start))
i+=1
# test
# if i>10:
# break
queue.put(_sentinel)
print("finish read")
#await asyncio.gather(*list_tasks)
def writer_loop(data_queue):
while True:
data = queue.get()
if data is _sentinel:
break
df, i = data
print(f"Writing batch {i} to disk")
write_to_parquet_v2(df,i)
def coordinate(list_tables_source):
data_queue = Queue()
reader = Thread(target=writeToCsv_v6, args=(list_tables_source, data_queue))
writer = Thread(target=loop, args=(data_queue,))
reader.start()
writer.start()
reader.join()
writer.join()
print("all done")
(as a side note: Python does allow any amount of indentation to count as an indented block - but 4 spaces is pretty standard these days - it is nicer to people that will collaborate with you in your code)
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.