Issue
Does the AsyncElasticsearch client open a new session for each async request?
AsyncElasticsearch (from elasticsearch-py) uses AIOHTTP. From what I understand, AIOHTTP recommends a using a context manager for the aiohttp.ClientSession
object, so as to not generate a new session for each request:
async with aiohttp.ClientSession() as session:
...
I'm trying to speed up my bulk ingests.
- How do I know if the AsyncElasticsearch client is using the same session, or setting up multiple?
- Do I need the above
async with...
command in my code snippet below?
# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
hosts=[os.getenv("ELASTIC_URL")],
verify_certs=False,
http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
timeout=60 * 60,
ssl_show_warn=False,
)
# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
for df in reader:
# Upload to elastic with username as id
async def generate_actions(df_chunk):
for index, record in df_chunk.iterrows():
doc = record.replace({np.nan: None}).to_dict()
doc.update(
{"_id": doc["username"], "_index": "users",}
)
yield doc
es_upl_chunk = 1000
async def main():
tasks = []
for i in range(0, len(df), es_upl_chunk):
tasks.append(
helpers.async_bulk(
client=async_es,
actions=generate_actions(df[i : i + es_upl_chunk]),
chunk_size=es_upl_chunk,
)
)
successes = 0
errors = []
print("Uploading to es...")
progress = tqdm(unit=" docs", total=len(df))
for task in asyncio.as_completed(tasks):
resp = await task
successes += resp[0]
errors.extend(resp[1])
progress.update(es_upl_chunk)
return successes, errors
responses = asyncio.run(main())
print(f"Uploaded {responses[0]} documents from {file}")
if len(responses[1]) > 0:
print(
f"WARNING: Encountered the following errors: {','.join(responses[1])}"
)
Solution
Turns out the AsyncElasticsearch was not the right client to speed up bulk ingests in this case. I use the helpers.parallel_bulk() function instead.
Answered By - j7skov
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.