Issue
I am trying to write code leveraging async functionality of Python. I have a DB connection class where I have code for (dis)connecting with DB and also for fetching the data. Now I want to asynchronously fetch data using fetch data method based on one identifier. Code is as shown below:
import pyexasol
import pandas as pd
import logging
from typing import Iterable
import asyncio
import tqdm
class Exa(object):
def __init__(self, dsn: str = '1.2.3.4',
user: str = os.environ['UID'],
password: str = os.environ['PWD']):
self.__dsn = dsn
self.__user = user
self.__password = password
self.conn = None
def __connect(self):
if self.conn is None:
try:
self.conn = pyexasol.connect(dsn=self.__dsn, user=self.__user,
password=self.__password, encryption=True)
except Exception as e:
logging.error(f"Error in connecting with Exasol. Error is: {e}")
def __disconnect(self):
if self.conn is not None:
try:
self.conn.close()
except Exception as e:
logging.error(f"Exception in disconnecting DB. Error is {e}")
self.conn = None
def fetch(self, query: str, leave_connection_open: bool = False) -> pd.DataFrame:
# connect and execute the query
self.__connect()
try:
res = self.conn.export_to_pandas(query)
res.columns = res.columns.str.lower()
except Exception as e:
self.__disconnect()
return pd.DataFrame()
if not leave_connection_open:
self.__disconnect()
return res
def fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
res = asyncio.run(self._fetch_batch(pattern=pattern, replacement=replacement,
query=query, batchsize=batchsize))
return res
async def _fetch_batch(self, pattern: str, replacement: Iterable,
query: str, batchsize: int = 5000) -> pd.DataFrame:
replacement = list(replacement)
# breaking into batches
if any(isinstance(i, str) for i in replacement):
batches = ["'" + "','".join(replacement[i:i + batchsize]) + "'"
for i in range(0, len(replacement), batchsize)]
else:
batches = [",".join(replacement[i:i + batchsize])
for i in range(0, len(replacement), batchsize)]
# connecting and executing query in batches
nbatches = len(batches)
self.__connect()
try:
tasks = [self.__run_batch_query(query=query.replace(pattern, batches[i]),
i=i, nbatches=nbatches) for i in range(nbatches)]
# progress bar
res = [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))]
except Exception as e:
logging.error("Could not fetch batches of data. Error is: %s", e)
'''finally:
self.__disconnect()'''
# dataframe concatenation
res = pd.concat(res)
res.columns = res.columns.str.lower()
return res
async def __run_batch_query(self, query: str,
i: int, nbatches: int) -> pd.DataFrame:
logging.info("Fetching %d/%d", i + 1, nbatches)
async with self.fetch(query=query, leave_connection_open=True) as resp:
raw = await resp
return raw
I am running this code with:
from foo import Exa
db = Exa()
ids = db.fetch('select id from application limit 100')
ids1 = db.fetch_batch(pattern='IDS',
replacement=ids['id'],
query='select id from application where id in (IDS)',
batchsize=25)
but then I get error like:
ERROR:root:Could not fetch batches of data. Error is: __aexit__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
async with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __aexit__
Also if I change __run_batch_query()
method call to self.fetch()
method without async
then error changes to:
ERROR:root:Could not fetch batches of data. Error is: __enter__
Traceback (most recent call last):
File "/home/priya/pydbutils/gitignored/foo2.py", line 85, in __run_batch_query
with self.fetch(query=query, leave_connection_open=True) as resp:
AttributeError: __enter__
please help by pointing out the mistake if any here?
Solution
pyexasol creator is here.
Please note, asyncio will not provide any benefits for Exasol-related scenarios. Ayncio runs on single CPU and utilises single network connection, which prevents any meaningful scaling.
The the most efficient ways to load data from Exasol server are:
export_to_pandas()
orexport_to_callback()
for single Python process;export_parallel()
+http_transport()
for multiple Python processes;
Please check HTTP Transport (parallel)
manual page for explanations and examples. This approach scales linearly, and you may even run the computation tasks on multiple servers.
For simple scenarios, you may consider compression=True
connection option if you transfer large amounts of data over slow (e.g. WiFi) network.
Answered By - wildraid
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.