Issue
My producer is a local sqlitedict (Very fast to iterate through). My consumer is fetching web pages. I need to limit the number of concurrent web-page scrapes. So I came up with:
from scraping import xpath_soup, get_page_content, links_from_soup
import asyncio
from sqlitedict import SqliteDict
from bs4 import BeautifulSoup
DB_PUBS = SqliteDict("data/publishers.sqlite")
PUB_BATCH_SIZE = 10
async def process_publisher(publisher, semaphore):
# Scrape all the links from the publisher URL
page_src = await get_page_content(publisher['url'])
if page_src is not None:
soup = BeautifulSoup(page_src, 'html.parser')
page_links = links_from_soup(soup, publisher['url'])
print(f"Found {len(page_links)} links for publisher {publisher['url']}")
semaphore.release()
async def process_publisher_queue(publisher_queue, semaphore):
while True:
publisher = await publisher_queue.get()
# spawn a task to process a publisher from the queue
asyncio.create_task(process_publisher(publisher, semaphore))
async def main():
# Get a batch of publishers
publisher_queue = asyncio.Queue(maxsize=PUB_BATCH_SIZE)
# create a semaphore to limit the number of queue items being processed at a time
semaphore = asyncio.Semaphore(PUB_BATCH_SIZE)
# spawn a task to process the publisher queue
asyncio.create_task(process_publisher_queue(publisher_queue, semaphore))
for pub_url, publisher in DB_PUBS.items():
await semaphore.acquire()
print(f"Adding publisher {pub_url} to queue")
await publisher_queue.put(publisher)
if __name__ == '__main__':
asyncio.run(main())
but then I thought to myself "Why should I even need the queue?" So I changed it to:
from scraping import xpath_soup, get_page_content, links_from_soup
import asyncio
from sqlitedict import SqliteDict
from bs4 import BeautifulSoup
DB_PUBS = SqliteDict("data/publishers.sqlite")
PUB_BATCH_SIZE = 10
async def process_publisher(publisher, semaphore):
# Scrape all the links from the publisher URL
page_src = await get_page_content(publisher['url'])
if page_src is not None:
soup = BeautifulSoup(page_src, 'html.parser')
page_links = links_from_soup(soup, publisher['url'])
print(f"Found {len(page_links)} links for publisher {publisher['url']}")
semaphore.release()
async def main():
# Get a batch of publishers
# create a semaphore to limit the number of queue items being processed at a time
semaphore = asyncio.Semaphore(PUB_BATCH_SIZE)
for pub_url, publisher in DB_PUBS.items():
await semaphore.acquire()
print(f"Adding publisher {pub_url} to queue")
asyncio.create_task(process_publisher(publisher, semaphore))
if __name__ == '__main__':
asyncio.run(main())
I then asked ChatGPT if this is a good idea and she said:
While it's technically possible to remove the queue and directly spawn tasks for each publisher, it might not be the most efficient way to process the publishers. Using an asyncio queue has some benefits, such as allowing you to limit the maximum number of items being processed at a time and providing a buffer in case the producers and consumers have different speeds.
By using the queue, you can add new publishers to the queue without worrying about how fast they are being processed, as they will be added to the queue and processed in a first-in-first-out order. If you directly spawn tasks for each publisher, there's a risk of overwhelming the system with too many tasks and consuming too much memory.
In addition, by using a queue, you can easily modify the program to use multiple consumer tasks to process the queue. This can help improve performance by allowing multiple tasks to work on the queue at the same time, which can reduce the overall processing time.
Overall, while it's technically possible to remove the queue, it might not be the most efficient way to process the publishers. Using a queue provides more control and flexibility, which can be useful in many scenarios.
but the semaphore already limits the maximum number of items being processed at a time. My thinking is that if I have a second awaitable I need to do with the page after fetching it, maybe then I would use a queue consumer. This script will undoubtedly get larger. I'm not neglecting anything here, am I?
Solution
If you are trying to waste the least amount of time the best way to achieve this is to use a single semaphore, and use that semaphore only during the retrieval stage. The processing stage can be separate and does not need to be blocked by the semaphore.
Another thing to note is that the recommended semaphore use is with the context manager (async with
), instead of acquire()
and release()
.
async def gather(url):
# Make sure to use sessions here to take
# advantage of HTTP Keep-Alive connections.
...
async def process(payload):
...
async def task(publisher, semaphore):
# Gather the publisher information.
# Ensure you are only doing it N at the same time.
url = publisher['url']
with semaphore:
payload = await gather(url)
processed_payload = process(payload)
# Do something with newly processed payload here.
async def main():
N = 10
sem = asyncio.semaphore(N)
tasks = [task(publisher, sem) for publisher in DB_PUBS.values()]
await asyncio.gather(*tasks)
Do note that even when using semaphores you'll likely still have a lot of requests being sent to the server at the same time, specially with N=10
.
Answered By - felipe
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.