Issue
Context:
I'm currently coding a bot on Discord. The bot has a server class within it (with none of the fancy websockets and http requests) and a client class that serves as a bridge between the user and the server. The instance of the client class manages sending log messages to its corresponding user, updating its GUI (which is just an embed and a bunch of buttons attached to it which), and calling methods on the server class.
Currently I'm stuck on log messages. The current system is that a GUI message that contains the controls would always be the most recently sent message.
If another user were to join a room on the server class, this would cause the GUI message to not be updated anymore. Additionally, a log message would be sent to the user, which would cause the GUI message to not be the most recently sent message. Both problems are solved by the bot deleting the old GUI message and sending the updated one after that.
However, concurrent room joins may occur, so there's a chance the bot would interleave the "delete message" and "send message" parts of updating the GUI message like this:
delete_message()
delete_message() # !!!
send_message()
send_message()
The second delete_message()
would cause an error, since it can't find a message that has already been deleted.
My proposed solution would be the problem below.
Problem:
Let's say I have an async function called foo
:
import asyncio
limit: int
async def foo():
print("bar")
async def foo_caller():
await asyncio.gather(foo(), foo(), foo(), foo(), foo())
await foo()
await foo()
This function would be called multiple times using the foo_caller
function. Currently, this would print bar
7 times.
The problem is, How to execute only one function call when foo
is called multiple times in a short timeframe?
The solution should print bar
only three times. One for the await asyncio.gather(foo(), foo(), foo(), foo(), foo())
, and one each for the await foo()
.
Solution
Here is a class, "Regulator", which can be used to wrap any Callable in a way that meets your requirements. The function will never be called more than once in a given time interval. Excess calls will be discarded.
The main
function is almost the same as your foo_caller
but I added some time delays so that it will be clear that the test program works. The program prints "bar" three times.
import asyncio
from typing import Callable
class Regulator:
def __init__(self, interval: float, f: Callable, *args, **kwargs):
"""
Do not call the function f more than one per t seconds.
interval is a time interval in seconds.
f is a function
*args and **kwargs are the usual suspects
"""
self.interval = interval
self.f = f
self.args = args
self.kwargs = kwargs
self.busy = False
async def __call__(self):
if not self.busy:
self.busy = True
asyncio.get_event_loop().call_later(self.interval, self.done)
self.f(*self.args, **self.kwargs)
def done(self):
self.busy = False
def say_bar():
print("bar")
foo = Regulator(0.5, say_bar)
async def main():
await asyncio.gather(foo(), foo(), foo(), foo(), foo())
await asyncio.sleep(1.0)
await foo()
await asyncio.sleep(1.0)
await foo()
if __name__ == "__main__":
asyncio.run(main())
Answered By - Paul Cornelius
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.