Issue
I've tried using both httpx and aiohttp, and both have this hard-coded limit.
import asyncio
import aiohttp
import httpx
async def main():
client = aiohttp.ClientSession()
# client = httpx.AsyncClient(timeout=None)
coros = [
client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
for _ in range(500)
]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ")
asyncio.run(main())
Output -
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99
And its just stuck at 99 with both libraries
This doesn't happen if a new Session is used for every request though.
What am I doing wrong? Isn't the whole point of asyncio to make things like this easy?
I tried re-writing this with threads, zmq and requests and it works great -
import zmq
N_WORKERS = 100
N_ITERS = 500
ctx = zmq.Context.instance()
def worker():
client = requests.Session()
pull = ctx.socket(zmq.PULL)
pull.connect("inproc://#1")
push = ctx.socket(zmq.PUSH)
push.connect("inproc://#2")
while True:
if not pull.recv_pyobj():
return
r = client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
push.send_pyobj(r.content)
def ventilator():
push = ctx.socket(zmq.PUSH)
push.bind("inproc://#1")
# distribute tasks to all workers
for _ in range(N_ITERS):
push.send_pyobj(True)
# close down workers
for _ in range(N_WORKERS):
push.send_pyobj(False)
# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
t.start()
# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")
for i in range(N_ITERS):
pull.recv_pyobj()
print(i, end=", ")
# wait for workers to exit
for t in threads:
t.join()
Solution
The problem is that you client.get(...)
returns a request object with live handle to OS-level socket. Failing to close that object results in aiohttp running out of sockets, i.e. hitting the connector limit, which is 100 by default.
To fix the problem you need to close the object returned by client.get()
, or use async with
which will ensure that the object gets closed as soon as the with
block is done. For example:
async def get(client):
async with client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
pass
async def main():
async with aiohttp.ClientSession() as client:
coros = [get(client) for _ in range(500)]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ", flush=True)
asyncio.run(main())
Additionally, the aiohttp.ClientSession
object should also be closed, which can also be accomplished using async with
, as shown above.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.