Issue
I get the below error
Fatal Python error: Cannot recover from MemoryErrors while normalizing exceptions. Current thread 0x0000ffff88de5010 (most recent call first): File "test.py", line 173 in wrap_get_fuzzy_match File "/usr/lib64/python3.7/asyncio/events.py", line 88 in _run File "/usr/lib64/python3.7/asyncio/base_events.py", line 1786 in _run_once File "/usr/lib64/python3.7/asyncio/base_events.py", line 541 in run_forever File "/usr/lib64/python3.7/asyncio/base_events.py", line 574 in run_until_complete File "test.py", line 224 in Aborted
async def get_valuation(url, params, api_header, session, semaphore):
async with semaphore:
async with session.get(url, headers=api_header) as response:
status_code = response.status
try:
if status_code != 200:
mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
else:
asynch_response = await response.json()
mmr = await get_best_match(params, asynch_response, str(status_code))
return mmr
except Exception as ex:
LOGGER.error(f"Error in get valuation and error was {ex}")
return ex
async def wrap_get_fuzzy_match(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as err:
LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
return err
async def main(headers, file):
tasks = []
sema = asyncio.Semaphore(500)
BATCH_SIZE = 1000000
async with ClientSession() as session:
with open(file) as f:
while True:
batch = [line.strip('\n') for line in islice(f, BATCH_SIZE)]
if not batch:
break
for param in batch:
task = asyncio.ensure_future(wrap_get_fuzzy_match(
get_valuation,
url= API + param,
params=param,
api_header=headers,
session=session,
semaphore=sema,
))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
Solution
I solved the issue by passing data in chunks and calling the main function in the loop.
async def get_valuation(url, params, api_header, session, semaphore):
"""
Call fuzzy match api
:param url:
:param api_header:
:param session:
:param semaphore:
:return:
"""
async with semaphore:
async with session.get(url, headers=api_header) as response:
status_code = response.status
try:
if status_code != 200:
mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
else:
asynch_response = await response.json()
mmr = await get_best_match(params, asynch_response, str(status_code))
return mmr
except Exception as ex:
LOGGER.error(f"Error in get valuation and error was {ex}")
return ex
async def wrap_get_fuzzy_match(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as err:
LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
return err
async def main(params, headers):
tasks = []
sema = asyncio.Semaphore(100)
async with ClientSession() as session:
async with timeout(None):
LOGGER.info(f"Number of urls to process: {len(tasks)}")
for param in params:
task = asyncio.ensure_future(wrap_get_fuzzy_match(
get_valuation,
url=API,
params=param,
api_header=headers,
session=session,
semaphore=sema,
))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
if __name__ == '__main__':
LOGGER.info("Start Processing")
BATCH_SIZE = <size of each batch>
loop = asyncio.get_event_loop()
try:
with open(INPUT) as file:
inputs = file.readlines()
except IOError:
LOGGER.exception("Unable to read valuation input file")
raise
chunked_list = list(divide_chunks(big_list=inputs, chunk_size=BATCH_SIZE))
LOGGER.info(
f"Chunked size- {len(chunked_list)}"
)
batch_counter = 0
for params in chunked_list:
batch_counter += 1
LOGGER.info(
f"Starting batch number [{batch_counter}] out of [{len(chunked_list)}] "
)
results = loop.run_until_complete(
asyncio.ensure_future(
main(params= params,headers=hdr)
)
)
LOGGER.info("Processing Completed!!")
loop.close()
Answered By - Smaurya
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.