Issue
The following is a double question but it's significant to know if it could.
After searching for an alternative I found some methods like asyncio.sleep(), asyncio.Timer(), Threading.Timer(), ...etc
However, after making the research in detail I found that anything working with coroutines is the best choice to get rid of the memory lack so, from searching I found something called asyncio.Timer() which is the best choice.
Basically, what I want to do is a game that can handle more than one million events for one million users.
For example, I need to "upgrade building" and this would be an event that will take n of time to upgrade it. Now I would like to make such a workflow to trigger at a specific time when the time of upgrading a building is ready, there are many ways to achieve that like job-crons I know but I need to achieve that by seconds as well not only by minutes like job-crons do as well as I need to go apart from lacking memory when I use traditional way when I use sleep by time.sleep() or by asyncio.sleep().
However, when I used asyncio.Timer() I got AttributeError because Timer is not a method to work with.
Solution
You can use loop.call_at
to have similar behavior to threading.Timer
, asyncio.Timer
does not exist in Python Standard Library.
Note that loop.call_at
gets when
argument, the value of the argument should be based on loop.time()
, so it is internat eventloop time.
import asyncio
from functools import partial
def callback_one() -> None:
print("Callback_one")
def callback_n(n: int) -> None:
print(f"Callback {n}")
async def main():
print("Started")
loop = asyncio.get_running_loop()
print(loop.time())
timer_handler_1 = loop.call_at(loop.time() + 10, callback_one)
timer_handler_2 = loop.call_at(loop.time() + 5, partial(callback_n, n=5))
print(timer_handler_1.when(), timer_handler_2.when())
await asyncio.sleep(20)
print("Finished")
if __name__ == '__main__':
asyncio.run(main())
Answered By - Artiom Kozyrev
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.