Issue
I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
so how to speed it up?
import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio
nest_asyncio.apply()
async def make_numbers(numbers, _numbers):
for i in range(numbers, _numbers):
yield i
n = 0
q = 10000000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
async for x in make_numbers(n, q):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
print(data)
s = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())
except:
print("error")
elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")
Traceback (most recent call last):
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
raise exceptions[0]
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
sock = await self._connect_sock(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
return await self._proactor.connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
future.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
value = callback(transferred, key, ov)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 136, in <module>
loop.run_until_complete(fetch())
File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
result = coro.send(None)
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 88, in fetch
response = await f
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 37, in _wait_for_one
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
result = coro.throw(exc)
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 125, in do_get
async with session.get(url + str(x), headers=headers) as response:
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
self._resp = await self._coro
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
raise last_exc
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
transp, proto = await self._wrap_create_connection(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
Solution
Bottleneck: number of simultaneous connections
First, the bottleneck is the total number of simultaneous connections in the TCP connector.
That default for aiohttp.TCPConnector
is limit=100
. On most systems (tested on macOS), you should be able to double that by passing a connector
with limit=200
:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
The time taken should decrease significantly. (On macOS: q = 20_000
decreased 43% from 58 seconds to 33 seconds, and q = 10_000
decreased 42% from 31 to 18 seconds.)
The limit
you can configure depends on the number of file descriptors that your machine can open. (On macOS: You can run ulimit -n
to check, and ulimit -n 1024
to increase to 1024 for the current terminal session, and then change to limit=1000
. Compared to limit=100
, q = 20_000
decreased 76% to 14 seconds, and q = 10_000
decreased 71% to 9 seconds.)
Supporting 50 million requests: async generators
Next, the reason why 50 million requests appears to hang is simply because of its sheer number.
Just creating 10 million coroutines in post_tasks
takes 68-98 seconds (varies greatly on my machine), and then the event loop is further burdened with that many tasks, 99.99% of which are blocked by the TCP connection pool.
We can defer the creation of coroutines using an async generator:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
We need a counterpart to asyncio.as_completed()
to handle async_gen
and concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
Then, we update fetch()
:
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
Other limitations
With the above, the program can start processing 50 million requests but:
- it will still take 8 hours or so with
CONCURRENCY = 1000
, based on the estimate fromtqdm
. - your program may run out of memory for
responses
and crash.
For point 2, you should probably do:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
An error in the code
do_get()
should return data
:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +
Answered By - aaron
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.