Issue
I need to pass URLs from txt file to the script that fetch URLs and do some work with the help of aiohttp
and asyncio
. So I use asyncio.Queue
to put URLs from txt file into queue. The nuance is that the txt file may be very large and may not fit in memory, and I tried to use queue maxsize
. But when the limit is less than number of URLs - script is stuck. So I have 2 questions:
- where is my mistake?
- maybe there is better solution than using
asyncio.Queue
for this case?
script.py:
import asyncio
from aiohttp import ClientSession
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
with open("urls.txt", 'r') as f:
queue = asyncio.Queue(maxsize=5)
for line in f:
line = line.strip()
await queue.put(line)
# await queue.join() - tried this
while not queue.empty():
current_url = await queue.get()
try:
content = await fetch(current_url)
print(f"{current_url}: {content[:15]}")
except Exception as e:
print(f"Error fetching {current_url}: {e}")
finally:
queue.task_done()
asyncio.run(main())
urls.txt:
https://www.google.com/
https://www.youtube.com/
https://www.facebook.com/
https://www.wikipedia.org/
https://www.amazon.com/
https://www.instagram.com/
https://www.twitter.com/
https://www.tumblr.com/
https://www.pinterest.com/
https://www.reddit.com/
In this script I use maxsize=5
and number of URLs in txt file is 10. I have tried to add await queue.join()
after for line in f:
loop but it didn't help. Script works only without await queue.join()
and when maxsize
>= number of URLs or is not specified.
Solution
Your program structure is wrong. To use a Queue you need a consumer (one or more) that reads from the queue in parallel.
Here is an simple example how you can create 3 workers that donwload the content as you put the URLs into the Queue. At the end we end the workers by putting None
to the queue:
import asyncio
from aiohttp import ClientSession
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def consumer(queue):
while True:
current_url = await queue.get()
try:
if current_url is None:
break
content = await fetch(current_url)
print(f"{current_url}: {content[:15].strip()}")
except Exception as e:
print(f"Error fetching {current_url}: {e}")
finally:
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
# for example, we create 3 workers consuming the queue
workers = {asyncio.Task(consumer(queue)) for _ in range(3)}
with open("urls.txt", "r") as f:
for line in f:
line = line.strip()
await queue.put(line)
# end the workers
for _ in range(len(workers)):
await queue.put(None)
await queue.join()
asyncio.run(main())
Prints:
https://www.google.com/: <!doctype html>
https://www.facebook.com/: <!DOCTYPE html>
https://www.wikipedia.org/: <!DOCTYPE html>
https://www.amazon.com/: <!doctype html>
https://www.instagram.com/: <!DOCTYPE html>
https://www.tumblr.com/: <!doctype html
https://www.youtube.com/: <!DOCTYPE html>
https://www.twitter.com/: <!DOCTYPE html>
https://www.pinterest.com/: <!DOCTYPE html>
https://www.reddit.com/: <!DOCTYPE
Answered By - Andrej Kesely
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.