Issue
I was hoping to use aioodbc with an async Semaphore to insert rows into a database. The below will write some rows into the destination database, but appears to lock up around Sempahore value
+1. Any suggestion on how to rework this or address the block/contention?
Table definition:
create table async_testing (
insert_id int null
)
Async Code:
import asyncio
import aioodbc
loop = asyncio.get_event_loop()
async def odbc_insert_worker(semaphore, value, conn):
await semaphore.acquire()
print("Acquire Semaphore")
async with conn.cursor() as cur:
await cur.execute('INSERT INTO async_testing VALUES (?)', value)
print("Release Semaphore")
semaphore.release()
async def db_main(loop, values):
dsn="foo"
values = list(values)
db_semaphore = asyncio.Semaphore(value=15)
async with aioodbc.create_pool(dsn=dsn, loop=loop, autocommit=True) as pool:
async with pool.acquire() as conn:
tasks = [odbc_insert_worker(db_semaphore, value, conn) for value in values]
await asyncio.gather(*tasks)
fmt_vals = range(0,1000)
loop.run_until_complete(db_main(loop, fmt_vals))
Solution
Thanks to the help of @jettify in the aiolibs channel this solution works:
import asyncio
import aioodbc
from concurrent.futures import ThreadPoolExecutor
loop = asyncio.get_event_loop()
async def odbc_insert_worker(conn, value):
async with conn.cursor() as cur:
await cur.execute('insert into async_testing values (?)', value)
async def db_main(loop, values):
dsn="foo"
values = list(values)
async with aioodbc.create_pool(dsn=dsn, loop=loop, executor=ThreadPoolExecutor(max_workers=3), autocommit=True) as pool:
tasks = [do_insert(pool, value) for value in values]
await asyncio.gather(*tasks)
async def do_insert(pool, value):
async with pool.acquire() as conn:
await odbc_insert_worker(conn, value)
fmt_vals = range(0,1000)
loop.run_until_complete(db_main(loop, fmt_vals))
Answered By - Alexander
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.