Issue
For some one-time task I need to go through all the records in a database of which there are a few millions, read a value in a cell, make a HTTP request and update another cell which is currently NULL
.
I want to send all of them by portions, asynchronously, via asyncio
. And not too many at a time because remote server may ban me: No more than 50 requests/second or at a time.
I've found this code:
import asyncio
import aiohttp
async def one(session, url):
# request the URL and read it until complete or canceled
async with session.get(url) as resp:
await resp.text()
async def fire(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(loop.create_task(one(session, url)))
# 10 seconds
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
except asyncio.TimeoutError:
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))
But it will send all the request at once.
How could I do it N
at a time? Meaning, send N
, then wait for 1 ... a few or even all of them to return values, then send another lot of N
... and so on.
Solution
Option A: With just asyncio
in batches
Python <3.11
from asyncio import create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
print("Requesting", url)
async with session.get(url) as resp:
text = await resp.text()
await sleep(2) # for demo purposes
print("Got response from", url, text.strip().split("\n", 1)[0])
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
tasks = []
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
new_task = create_task(get_one(session, url))
tasks.append(new_task)
await gather(*tasks)
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
Output:
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Got response from https://stackoverflow.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://python.org <!doctype html>
You'll notice that the third requests (to python.org
) is only sent after both previous requests have returned a response. This setup will essentially perform your total number of requests in batches of num_concurrent
.
Python >=3.11
With the newer TaskGroup
class, we can make the get_all
function a bit more concise:
from asyncio import TaskGroup, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
with TaskGroup() as tg:
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
tg.create_task(get_one(session, url))
...
Option B: With just asyncio
in a Queue
The asyncio.Queue
allows us to set a maximum size for it. That makes it possible to limit the maximum number of concurrently executing tasks, but we will need to use the consumer-producer-pattern:
from asyncio import Queue, create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
STOP_SENTINEL = object()
async def consumer(session: ClientSession, q: Queue[str]) -> None:
url = await q.get()
while url is not STOP_SENTINEL:
await get_one(session, url)
q.task_done()
url = await q.get()
q.task_done()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
num_concurrent = 2
q = Queue(maxsize=num_concurrent)
async with ClientSession() as session:
consumers = [
create_task(consumer(session, q))
for _ in range(num_concurrent)
]
for url in urls:
await q.put(url)
for _ in range(num_concurrent):
await q.put(STOP_SENTINEL)
await gather(*consumers)
run(main())
Output:
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://stackoverflow.com <!DOCTYPE html>
Got response from https://python.org <!doctype html>
As you can see now, that third request can be sent as soon as either of the previous two returns with a response.
That may be more efficient, even though the setup is a bit more cumbersome.
Option C: With an extra package
I used to run into similar issues with setting a fixed number of asyncio
tasks to work on a large number of actual tasks. To make this easier I wrote the asyncio-taskpool
package. With it I can do something like this:
from asyncio import run, sleep
from aiohttp import ClientSession
from asyncio_taskpool import TaskPool
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
async def get_all(urls: list[str], num_concurrent: int) -> None:
pool = TaskPool()
async with ClientSession() as session:
pool.starmap(
get_one,
((session, url) for url in urls),
num_concurrent=num_concurrent,
)
await pool.gather_and_close()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
Output: (same as with the Queue
approach)
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://stackoverflow.com <!DOCTYPE html>
Got response from https://python.org <!doctype html>
You'll notice again that the third request will only be made after at least one of the other two returns with a response.
You can try that out with larger numbers of tasks. The number being executed concurrently at any given time will never exceed num_concurrent
as passed to map
(starmap
is just a variant of map
).
I tried to emulate the standard multiprocessing.Pool
interface to an extent and find this more convenient to use, especially with long-running tasks.
Answered By - Daniil Fajnberg
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.