Issue
I'm writing a python script that checks the uptime of a few domains every few minutes. I run a coroutine for each website in a while loop and sleep them after the check is done. This works, however I also want to be able to cancel them. The problem I'm having is that when I await these coroutines in asyncio.gather(), they block the thread, since they never return a result.
If I remove await asyncio.gather(*tasks.values(), return_exceptions=True)
I get RuntimeError: Session is closed
How can I run them without blocking the thread?
This is a simplified version of the code. I'm using a simple aiohttp server for testing.
Server code:
from aiohttp import web
import asyncio
import random
async def handle(request: web.Request) -> web.Response:
await asyncio.sleep(random.randint(0, 3))
return web.Response(text=f"Hello, from {request.rel_url.path}")
app = web.Application()
app.router.add_route('GET', '/{name}', handle)
web.run_app(app)
Uptime checker code:
import asyncio
import aiohttp
LIMIT = 2
async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
while True:
try:
async with semaphore:
async with session.get(url) as response:
if response.status != 200:
print(f"error with {url} {response.status}")
else:
print(f"success with {url}")
await asyncio.sleep(5)
except Exception as e:
print(f"error with {url} {e}")
async def main() -> None:
urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
tasks = {}
semaphore = asyncio.BoundedSemaphore(LIMIT)
try:
async with aiohttp.ClientSession() as session:
for url in urls:
tasks[url] = asyncio.create_task(
check_uptime_coro(session, url, semaphore))
await asyncio.gather(*tasks.values(), return_exceptions=True)
print("This doesn't print!")
except Exception as e:
print(f"error! {e}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("This also doesn't print!")
Solution
You have to understnad that your call do asyncio.gather
is not "blocking the thread" - it is, instead, blocking the task.
If you want to run things while the checker-tasks are running, and the web session is open, simply fire the gather
to synchronize those tasks in a different task than the one you want to run things on.
If the tasks will never return, you actually don't need even to call gather on then - just take note of all tasks in a set or other container, and keep that around so they are not de-referenced, and can be properly cancelled when the time comes.
Other than that, the sole reason you are getting the error you report when not awaiting for the tasks with gather is because your execution falls off the with
statement block that opens the session.
You could just not use a with
block to start with, and call the __enter__
and __exit__
methods manually - but you can also simply rewrite things so that that with block is in a searate task, as I mentioned above. In Python 3.11 you can use task groups: they will work better than gather
and cancel all checker-tasks when the parent task itself is cancelled.
import asyncio
import aiohttp
LIMIT = 2
async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
while True:
try:
async with semaphore:
async with session.get(url) as response:
if response.status != 200:
print(f"error with {url} {response.status}")
else:
print(f"success with {url}")
await asyncio.sleep(5)
except Exception as e:
print(f"error with {url} {e}")
async def check_uptime_master():
urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
tasks = {}
semaphore = asyncio.BoundedSemaphore(LIMIT)
async with aiohttp.ClientSession() as session:
for url in urls:
tasks[url] = asyncio.create_task(
check_uptime_coro(session, url, semaphore))
await asyncio.gather(*tasks.values(), return_exceptions=True)
async def main() -> None:
try:
checker_task = asyncio.create_task(check_uptime_master())
await asyncio.sleep(0) # give the asyncio loop a chance to fire-up the subtasks
print("This now, does print!")
except Exception as e:
print(f"error! {e}")
# go on with your code on the main task, DOn't forget to yield to the loop
# so subtasks can run!
...
if __name__ == "__main__":
asyncio.run(main()) # this is the new recomended way to fire asyncio
# loop = asyncio.get_event_loop() #<- obsolete
# loop.run_until_complete(main()) # <- obsolete
print("This also doesn't print - and will not until yo write code that explictly cancels the `checker_task` above")
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.