Issue
I have a large list of companies and am calling a REST API to get daily stock price for each company. Details are stored in a PostgreSQL database. The core function looks as follows:
async def get_data_asynchronous():
conn = await asyncpg.connect(**DBConn)
path = 'path'
source = pd.read_excel(io=path + 'companies.xlsx', sheet_name='data')
retries = Retry(total=2, backoff_factor=1, status_forcelist=[404, 502, 503, 504])
dates = pd.date_range('2015-01-01', '2019-12-01', freq='D').strftime("%d-%m-%Y").tolist()
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
session.mount('https://', HTTPAdapter(max_retries=retries))
loop = asyncio.get_event_loop()
for index, inputrow in source.iterrows():
try:
if int(inputrow['rowid']) > 0:
compid = inputrow['compid'].lower().strip()
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, compid, datetime.datetime.strptime(str(dates[i-1]), '%d-%m-%Y'), datetime.datetime.strptime(str(dates[i]), '%d-%m-%Y'))
)
for i in range(len(dates))
]
for content in await asyncio.gather(*tasks):
if content is not None:
for data in content:
compid = data.get('compid', '')
date = data.get('date', '')
stock_price = data.get('sprice', '')
try:
await conn.execute('''
INSERT INTO comp_dailyhistory VALUES($1, $2, $3)
''', compid, date, stock_price)
except Exception as e:
print('ERROR')
pass
pass
except Exception as e:
print(str(e))
pass
In the above function, I first get the list of companies from am excel worksheet (source) and create a list of dates. Since there are more than 200k companies in my list, I create a ThreadPoolExecutor of up to 10 workers. The aim is to pass each company id (compid) and two consecutive dates from the date range to a 'fetch' function in an asynchronous manner, so as to speed up the whole data collection process. The fetch function looks as follows:
def fetch(session, compid, start, stop):
base_url = 'baseurl'
try:
with session.get(base_url + 'compid=' + compid + '&begin=' + str(int(start.timestamp())) + '&end=' + str(int(stop.timestamp())), timeout=None) as data:
content = []
if data.status_code == 200:
for item in data.json():
ret = {'compid': compid, 'date': str(date), 'sprice': sprice}
content.append(ret)
return content
else:
return None
except Exception as e:
return None
The fetch function uses requests.get to get a list of stock prices for the company between start and stop dates, parses the JSON response into a list of key-value pairs, and returns them to the calling function. The returned lists are then are picked up by the asyncio.gather function in the calling function, where each stock price is stored in postgreSQL using asyncpg. The rest of the code is as follows:
def main():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
main()
The main issue with this set up is that the script does not seem to be picking up the full set of prices for a given company. For example, for compid = 1, there should be exactly 600 daily prices. However, I get a different result each time the script is run, which is always lower than the true count. For instance, I get 550 daily prices in the first run, 570 in the second run, 540 in the third run, and so on....
Why is my script unable to pick up the full list of 600 daily prices? Are some of my requests getting dropped somehow? I tried an alternative with aiohttp requests, but haven't made much progress.
I have no experience with multithreaded programming, especially with asyncio, and would really appreciate any help in this regard? Thanks in advance for your time.
Solution
I have done several projects involving scraping websites to obtain thousands of stock prices each day. The problem, as dano suggested, is related to your error handling:
except Exception as e:
return None
This does nothing to handle failed requests. You can append the failed urls to a list, and at the end of your script run your "get" function again with those urls. If your information is critical, you can even define a function that tries at least 5-10 times to download the information of a stock before it returns None.
More related to the multithreading question, you need to be careful with the number of requests per second/minute/hour and avoid exceeding the API/website rate limit. You can use multiple proxies for that.
Hope it helps.
Answered By - Étienne Célèry
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.