Issue
I want my FastAPI app to have access to always actual bot_data
of python-telegram-bot.
I need that so when i call some endpoint in FastAPI could, for example, send messages to all chats, stored somewere in bot_data
.
As i understand the problem: bot.run_polling()
and uvicorn.run(...)
launch two independent async loops. And i need to run them in one.
UPD-1:
Thanks to @MatsLindh I created next function which i pass to main block, but it works inconsistent. Some times bot.run_polling() (gets correct loop and everything works, but other times and breaks with error that there are different loops):
import asyncio
from uvicorn import Config, Server
# --snip--
def run(app: FastAPI, bot:Application):
# using get_event_loop leads to:
# RuntimeError: Cannot close a running event loop
# I guess it is because bot.run_polling()
# calls loop.run_until_complete() different tasks
# loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
server = Server(Config(app=app, port=9001))
loop.create_task(server.serve())
t = Thread(target=loop.run_forever)
t.start()
bot.run_polling()
t.join()
# --snip--
if __name__ == "__main__":
# --snip--
run(f_app, bot_app)
Also i know I could decompose bot.run_polling()
into several separate calls that are agregated inside, but I am sure it should work with just that shortuct funcion.
Initial
My simplified setup looks like below.
Initially I tried to run not with threads but with multiprocessing.Proccess
, however in that way my bot_data
was always empty - i assumed it is because bot data not shared between processes so whole thing must be in one process. And here I am failing in to run all these stuff in one async loop.
# main.py
# python3.10
# pip install fastapi[all] python-telegram-bot
from threading import Thread
import uvicorn
from telegram.ext import Application, ApplicationBuilder, PicklePersistence
from fastapi import FastAPI, Request
BOT_TOKEN = "telegram-bot-token"
MY_CHAT = 123456
class MyApp(FastAPI):
def add_bot(self, bot_app: Application):
self.bot_app = bot_app
async def post_init(app: Application):
app.bot_data["key"] = 42
f_app = MyApp()
@f_app.get("/")
async def test(request: Request):
app: MyApp = request.app
bot_app: Application = app.bot_app
val = bot_app.bot_data.get('key')
print(f"{val=}")
await bot_app.bot.send_message(MY_CHAT, f"Should be 42: {val}")
if __name__ == "__main__":
pers = PicklePersistence("storage")
bot_app = ApplicationBuilder().token(BOT_TOKEN).post_init(post_init).persistence(pers).build()
f_app.add_bot(bot_app)
t1 = Thread(target=uvicorn.run, args=(f_app,), kwargs={"port": 9001})
t1.start()
# --- Launching polling in main thread causes
# telegram.error.NetworkError: Unknown error in HTTP implementation:
# RuntimeError('<asyncio.locks.Event object at 0x7f2764e6fd00 [unset]> is bound to a different event loop')
# message is sent and value is correct, BUT app breaks and return 500
# bot_app.run_polling()
# --- Launching polling in separate thread causes
# RuntimeError: There is no current event loop in thread 'Thread-2 (run_polling)'.
# t2 = Thread(target=bot_app.run_polling)
# t2.start()
# --- Launching with asyncio causes:
# ValueError: a coroutine was expected, got <bound method Application.run_polling ...
# import asyncio
# t2 = Thread(target=asyncio.run, args=(bot_app.run_polling,))
# t2.start()
t1.join()
Solution
When calling uvicorn.run()
, a new event loop is created (internally, asyncio.run()
is being called—see the linked source code). When attemping to lunch another application after starting the uvicorn
server (and hence, the FastAPI app)—or, vice versa—that also creates a new event loop, such as your Telegram bot app, that line of code to start the other application will not going to be reached, until the already running event loop is exited. This is because running an event loop is blokcing, meaning that it will block the calling thread until the event loop is terminated.
If you also attempted running the other application (essentially, an event loop) within an app that is already using an event loop, or attempted calling asyncio.run()
or there is more than one call to loop.run_until_complete()
within the app, you would come across errors such as:
> RuntimeError: Cannot run the event loop while another loop is running
> RuntimeError: asyncio.run() cannot be called from a running event loop
> RuntimeError: This event loop is already running
There are a few ways to solve this. For demontration purposes, the solutions given below use a simple printing app as the second application that also creates an event loop. This app is as follows:
printing_app.py
import asyncio
async def go():
counter = 0
while True:
counter += 1
print(counter)
await asyncio.sleep(1)
def run():
asyncio.run(go())
Solution 1
You can use uvicorn.Server.serve()
to run uvicorn
from an already running async
environment (see also the implementation of Config
class for all the available parameters, i.e., host
, port
, etc.). First, use asyncio.new_event_loop()
to create a new event loop and then set it as the current event loop for the current thread, using asyncio.set_event_loop()
. Next, schedule the execution of the other asynchronous app, by using loop.create_task()
and passing a coroutine to it (i.e., a coroutine object is the result of calling an async def
function), not the method that executes the asyncio.run()
function. In printing_app.py
above, that is the go()
function. The coroutine that is wrapped in the task may not run immediately. It is scheduled and will run as soon as the event loop finds an opportunity to execute the task—as described in this answer, this may happen when the currently-running coroutine reaches an await
expression, as well as an async for
or async with
block, as these operations use await
under the hood.
Finally, use loop.run_until_complete()
to run the uvicorn server, by passing the uvicorn.Server.serve()
coroutine—if the argument passed to loop.run_until_complete()
is a coroutine, it is wrapped in a Task
(see the relevant implementation, as well as the documentation link above); hence, there is no need for one calling loop.create_task()
on the coroutine this time. It will execute the provided task and block until it is complete.
In the interest of clarity, asyncio.new_event_loop()
, followed by asyncio.set_event_loop()
and loop.run_until_complete()
is what actually happens behind the scenes when using asyncio.run()
—see the latest Python's Runner
class implementation, as well as the implementation of the run()
method in Python 3.10 (which might be more clear).
P.S. One could alternatively create every task using create_task()
and finally call loop.run_forever()
, which will run the event loop forever, until it is explicitly stopped by calling its stop()
method. On the other hand, loop.run_until_complete()
will keep running until the task you passed to it is complete and the result is returned (or when an exception is raised). Depending on one's needs, as well as the nature of tasks that they have to execute, may choose between the two.
Example 1
from fastapi import FastAPI
import printing_app
import asyncio
import uvicorn
app = FastAPI()
@app.get('/')
def main():
return 'Hello World!'
def start_uvicorn(loop):
config = uvicorn.Config(app, loop=loop)
server = uvicorn.Server(config)
loop.run_until_complete(server.serve())
def start_printing_app(loop):
loop.create_task(printing_app.go()) # pass go() (coroutine), not run()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_printing_app(loop)
start_uvicorn(loop)
Example 2
Since this is a FastAPI application, you could run the server as usual (using uvicorn.run(app)
), and utilise FastAPI's/Starlette's Lifespan events to execute the second app at startup
. To execute it, you can use asyncio.create_task()
, which will wrap the coroutine into a task, as explained earlier, and schedule its execution. The task will be executed in the loop returned by asyncio.get_running_loop()
, which returns the event loop in the current thread. Alternatively, you could call asyncio.get_running_loop()
yourself to get the running event loop, and then use the create_task()
function, as mentioned earlier, to execute the task.
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import printing_app
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(printing_app.go())
# Alternatively:
#loop = asyncio.get_running_loop()
#loop.create_task(printing_app.go())
yield
app = FastAPI(lifespan=lifespan)
@app.get('/')
def main():
return 'Hello World!'
if __name__ == '__main__':
uvicorn.run(app)
Example 3
Another variation would be to use asyncio.run()
to create an async
environment to run the app, then call asyncio.create_task()
to start the other application, and finally, use await server.serve()
to start the uvicorn server—any further code after that last part would be executed once the uvicorn server has finished running or forced to exit (e.g., when pressing CTRL + C
).
from fastapi import FastAPI
import asyncio
import printing_app
import uvicorn
app = FastAPI()
@app.get('/')
def main():
return 'Hello World!'
async def main():
# start printing app
asyncio.create_task(printing_app.go())
# start uvicorn server
config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve()
if __name__ == '__main__':
asyncio.run(main())
Solution 2
Another solution would be to use nest_asyncio
, as demonstrated here, which allows running multiple asyncio
event loops in nested environments. However, it is generally recommended to avoid using nested event loops, as it could lead to unexpected behavior.
Running Telegram Bot app within FastAPI app
As mentioned in this comment on github by a maintainer of the relevant library, using Application.run_polling()
is purely optional and would block the event loop until the user sends a stop signal; that is what makes run_polling()
unsuitable when combined with ASGI frameworks, such as FastAPI. In that case, you can just manually call the methods that run_polling()
actually runs behind the scenes. An example showing how to run uvicorn server on Starlette application, along with a telegram-bot application, can be seen here. Based on that example and all the information provided earlier, the following solutions are provided.
Example 1
from fastapi import FastAPI
import asyncio
import uvicorn
app = FastAPI()
@app.get('/')
def main():
return 'Hello World!'
async def main():
config = uvicorn.Config(app, host='0.0.0.0', port=8000)
server = uvicorn.Server(config)
application = .... # initialise your telegram-bot app
# Run application and webserver together
async with application:
await application.start()
await server.serve()
await application.stop()
if __name__ == '__main__':
asyncio.run(main())
Example 2
from fastapi import FastAPI
from contextlib import asynccontextmanager
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
application = .... # initialise your telegram-bot app
await application.start()
yield
await application.stop()
app = FastAPI(lifespan=lifespan)
@app.get('/')
def main():
return 'Hello World!'
if __name__ == '__main__':
uvicorn.run(app)
Answered By - Chris
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.