Issue
I'm currently designing a spider to crawl a specific website. I can do it synchronous but I'm trying to get my head around asyncio to make it as efficient as possible. I've tried a lot of different approaches, with yield
, chained functions
and queues
but I can't make it work.
I'm most interested in the design part and logic to solve the problem. Not necessary runnable code, rather highlight the most important aspects of assyncio. I can't post any code, because my attempts are not worth sharing.
The mission:
The exemple.com (I know, it should be example.com) got the following design:
In synchronous manner the logic would be like this:
for table in my_url_list:
# Get HTML
# Extract urls from HTML to user_list
for user in user_list:
# Get HTML
# Extract urls from HTML to user_subcat_list
for subcat in user_subcat_list:
# extract content
But now I would like to scrape the site asynchronous. Lets say we using 5 instances (tabs in pyppeteer or requests in aiohttp) to parse the content. How should we design it to make it most efficient and what asyncio syntax should we use?
Update
Thanks to @user4815162342 who solved my problem. I've been playing around with his solution and I post runnable code below if someone else want to play around with asyncio.
import asyncio
import random
my_url_list = ['exemple.com/table1', 'exemple.com/table2', 'exemple.com/table3']
# Random sleeps to simulate requests to the server
async def randsleep(caller=None):
i = random.randint(1, 6)
if caller:
print(f"Request HTML for {caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def process_urls(url_list):
print(f'async def process_urls: added {url_list}')
limit = asyncio.Semaphore(5)
coros = [process_user_list(table, limit) for table in url_list]
await asyncio.gather(*coros)
async def process_user_list(table, limit):
async with limit:
# Simulate HTML request and extracting urls to populate user_list
await randsleep(table)
if table[-1] == '1':
user_list = ['exemple.com/user1', 'exemple.com/user2', 'exemple.com/user3']
elif table[-1] == '2':
user_list = ['exemple.com/user4', 'exemple.com/user5', 'exemple.com/user6']
else:
user_list = ['exemple.com/user7', 'exemple.com/user8', 'exemple.com/user9']
print(f'async def process_user_list: Extracted {user_list} from {table}')
# Execute process_user in parallel, but do so outside the `async with`
# because process_user will also need the semaphore, and we don't need
# it any more since we're done with fetching HTML.
coros = [process_user(user, limit) for user in user_list]
await asyncio.gather(*coros)
async def process_user(user, limit):
async with limit:
# Simulate HTML request and extracting urls to populate user_subcat_list
await randsleep(user)
user_subcat_list = [user + '/profile', user + '/info', user + '/followers']
print(f'async def process_user: Extracted {user_subcat_list} from {user}')
coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
await asyncio.gather(*coros)
async def process_subcat(subcat, limit):
async with limit:
# Simulate HTML request and extracting content
await randsleep(subcat)
print(f'async def process_subcat: Extracted content from {subcat}')
if __name__ == '__main__':
asyncio.run(process_urls(my_url_list))
Solution
Let's restructure the sync code so that each piece that can access the network is in a separate function. The functionality is unchanged, but it will make things easier later:
def process_urls(url_list):
for table in url_list:
process_user_list(table)
def process_user_list(table):
# Get HTML, extract user_list
for user in user_list:
process_user(user)
def process_user(user):
# Get HTML, extract user_subcat_list
for subcat in user_subcat_list:
process_subcat(subcat)
def process_subcat(subcat):
# get HTML, extract content
if __name__ == '__main__':
process_urls(my_url_list)
Assuming that the order of processing doesn't matter, we'd like the async version to run all the functions that are now called in for
loops in parallel. They'll still run on a single thread, but they will await
anything that might block, allowing the event loop to parallelize the waiting and drive them to completion by resuming each coroutine whenever it is ready to proceed. This is achieved by spawning each coroutine as a separate task that runs independent of other tasks and therefore in parallel. For example, a sequential (but still async) version of process_urls
would look like this:
async def process_urls(url_list):
for table in url_list:
await process_user_list(table)
This is async because it is running inside an event loop, and you could run several such functions in parallel (which we'll show how to do shortly), but it's also sequential because it chooses to await
each invocation of process_user_list
. At each loop iteration the await
explicitly instructs asyncio to suspend execution of process_urls
until the result of process_user_list
is available.
What we want instead is to tell asyncio to run all invocations of process_user_list
in parallel, and to suspend execution of process_urls
until they're all done. The basic primitive to spawn a coroutine in the "background" is to schedule it as a task using asyncio.create_task
, which is the closest async equivalent of a light-weight thread. Using create_task
the parallel version of process_urls
would look like this:
async def process_urls(url_list):
# spawn a task for each table
tasks = []
for table in url_list:
asyncio.create_task(process_user_list(table))
tasks.append(task)
# The tasks are now all spawned, so awaiting any one task lets
# them all run.
for task in tasks:
await task
At first glance the second loop looks like it awaits tasks in sequence like the previous version, but this is not the case. Since each await
suspends to the event loop, awaiting any task allows all tasks to progress, as long as they were scheduled beforehand using create_task()
. The total waiting time will be no longer than the time of the longest task, regardless of the order in which they finish.
This pattern is used so often that asyncio has a dedicated utility function for it, asyncio.gather
. Using this function the same code can be expressed in a much shorter version:
async def process_urls(url_list):
coros = [process_user_list(table) for table in url_list]
await asyncio.gather(*coros)
But there is another thing to take care of: since process_user_list
will get HTML from the server and there will be many instances of it running in parallel, and we cannot allow it to hammer the server with hundreds of simultaneous connections. We could create a pool of worker tasks and some sort of queue, but asyncio offers a more elegant solution: the semaphore. Semaphore is a synchronization device that doesn't allow more than a pre-determined number of activations in parallel, making the rest wait in line.
The final version of process_urls
creates a semaphore and just passes it down. It doesn't activate the semaphore because process_urls
doesn't actually fetch any HTML itself, so there is no reason for it to hold a semaphore slot while process_user_list
s are running.
async def process_urls(url_list):
limit = asyncio.Semaphore(5)
coros = [process_user_list(table, limit) for table in url_list]
await asyncio.gather(*coros)
process_user_list
looks similar, but it does need to activate the semaphore using async with
:
async def process_user_list(table, limit):
async with limit:
# Get HTML using aiohttp, extract user_list
# Execute process_user in parallel, but do so outside the `async with`
# because process_user will also need the semaphore, and we don't need
# it any more since we're done with fetching HTML.
coros = [process_user(user, limit) for user in user_list]
await asyncio.gather(*coros)
process_user
and process_subcat
are more of the same:
async def process_user(user, limit):
async with limit:
# Get HTML, extract user_subcat_list
coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
await asyncio.gather(*coros)
def process_subcat(subcat, limit):
async with limit:
# get HTML, extract content
# do something with content
if __name__ == '__main__':
asyncio.run(process_urls(my_url_list))
In practice you will probably want the async functions to share the same aiohttp session, so you'd probably create it in the top-level function (process_urls
in your case) and pass it down along with the semaphore. Each function that fetches HTML would have another async with
for the aiohttp request/response, such as:
async with limit:
async with session.get(url, params...) as resp:
# get HTML data here
resp.raise_for_status()
resp = await resp.read()
# extract content from HTML data here
The two async with
s can be collapsed into one, reducing the indentation but keeping the same meaning:
async with limit, session.get(url, params...) as resp:
# get HTML data here
resp.raise_for_status()
resp = await resp.read()
# extract content from HTML data here
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.