Issue
I have list of URL links which I get and save to HTML files with following code:
tasksURL = []
async with aiohttp.ClientSession() as session:
for url in listOfURLs:
tasksURL.append(self.fetch(session, url))
allHTMLs = await asyncio.gather(*tasksURL)
i = 0
for html in allHTMLs:
i += 1
with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
f.write(html)
Since URL list can be quite large (up to 60 000) I need to chunk this tasks.
I tried following solution. I've defined function that will chop list in smaller chunks with this function:
def chunkList(self, listOfURLs, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
And than use this function to run each chunked piece of listOfURLs
like this:
tasksURL = []
chunkedListOfURLs = self.chunkList(listOfURLs, 5)
for URLList in chunkedListOfURLs:
async with aiohttp.ClientSession() as session:
for url in URLList:
tasksURL.append(self.fetch(session, url))
allHTMLs = await asyncio.gather(*tasksURL)
for html in allHTMLs:
with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
f.write(html)
I'm getting error:
RuntimeError: cannot reuse already awaited coroutine
I understand problem but haven't found way around it.
Solution
I would suggest to use the asyncio.Queue in this case. You don't want to create 60k tasks for each URL. When you use queue, you can spawn a set number of workers and limit the queue size:
If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then await put() blocks when the queue reaches maxsize until an item is removed by get().
import asyncio
import random
WORKERS = 10
async def worker(q):
while True:
url = await q.get()
t = random.uniform(1, 5)
print(f"START: {url} ({t:.2f}s)")
await asyncio.sleep(t)
print(f"END: {url}")
q.task_done()
async def main():
q = asyncio.Queue(maxsize=100)
tasks = []
for _ in range(WORKERS):
tasks.append(asyncio.create_task(worker(q)))
for i in range(10):
await q.put(f"http://example.com/{i}")
await q.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
main = asyncio.run(main())
Test:
$ python test.py
START: http://example.com/0 (1.14s)
START: http://example.com/1 (4.40s)
START: http://example.com/2 (2.48s)
START: http://example.com/3 (4.34s)
START: http://example.com/4 (1.94s)
END: http://example.com/0
START: http://example.com/5 (1.52s)
END: http://example.com/4
START: http://example.com/6 (4.84s)
END: http://example.com/2
START: http://example.com/7 (4.35s)
END: http://example.com/5
START: http://example.com/8 (2.33s)
END: http://example.com/3
START: http://example.com/9 (1.80s)
END: http://example.com/1
END: http://example.com/8
END: http://example.com/9
END: http://example.com/6
END: http://example.com/7
Btw writing to files will block your main event loop, either call it in run_in_executor or use aiofiles.
Update Sat 3 Apr 13:49:55 UTC 2021:
Example:
import asyncio
import traceback
import aiohttp
WORKERS = 5
URLS = [
"http://airbnb.com",
"http://amazon.co.uk",
"http://amazon.com",
"http://baidu.com",
"http://basecamp.com",
"http://bing.com",
"http://djangoproject.com",
"http://envato.com",
"http://facebook.com",
"http://github.com",
"http://gmail.com",
"http://google.co.uk",
"http://google.com",
"http://google.es",
"http://google.fr",
"http://heroku.com",
"http://instagram.com",
"http://linkedin.com",
"http://live.com",
"http://netflix.com",
"http://rubyonrails.org",
"http://shopify.com",
"http://stackoverflow.com",
"http://trello.com",
"http://wordpress.com",
"http://yahoo.com",
"http://yandex.ru",
"http://yiiframework.com",
"http://youtube.com",
]
class Bot:
async def fetch(self, client, url):
async with client.get(url) as r:
return await r.text()
async def worker(self, q, client):
loop = asyncio.get_running_loop()
while True:
url = await q.get()
try:
html = await self.fetch(client, url)
except Exception:
traceback.print_exc()
else:
await loop.run_in_executor(None, self.save_to_disk, url, html)
finally:
q.task_done()
def save_to_disk(self, url, html):
print(f"{url} ({len(html)})")
async def main():
q = asyncio.Queue(maxsize=100)
tasks = []
async with aiohttp.ClientSession() as client:
bot = Bot()
for _ in range(WORKERS):
tasks.append(asyncio.create_task(bot.worker(q, client)))
for url in URLS:
await q.put(url)
await q.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
main = asyncio.run(main())
Answered By - HTF
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.