Issue
I have an application which already runs infinitely with asyncio event loop run forever and also I need to run a specific function every 10 seconds.
def do_something():
pass
a = asyncio.get_event_loop()
a.run_forever()
I would like to call the function do_something every 10 seconds. How to achieve this without replacing asynctio event loop with while loop ?
Edited: I can achieve this with the below code
def do_something():
pass
while True:
time.sleep(10)
do_something()
But I dont want to use while loop to run infinitely in my application instead I would like to go with asyncio run_forever(). So how to call the same function every 10 seconds with asyncio ? is there any scheduler like which will not block my ongoing work ?
Solution
asyncio
does not ship with a builtin scheduler, but it is easy enough to build your own. Simply combine a while
loop with asyncio.sleep
to run code every few seconds.
async def every(__seconds: float, func, *args, **kwargs):
while True:
func(*args, **kwargs)
await asyncio.sleep(__seconds)
a = asyncio.get_event_loop()
a.create_task(every(1, print, "Hello World"))
...
a.run_forever()
Note that the design has to be slightly different if func
is itself a coroutine or a long-running subroutine. In the former case use await func(...)
and in the latter case use asyncio
's thread capabilities.
Answered By - MisterMiyagi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.