Issue
I'm building an async backend for an analytics system using FastAPI. The thing is it has to: a) listen for API calls and be available at all times; b) periodically perform a data-gathering task (parsing data and saving it into the DB).
I wrote this function to act as a daemon:
async def start_metering_daemon(self) -> None:
"""sets a never ending task for metering"""
while True:
delay: int = self._get_delay() # delay in seconds until next execution
await asyncio.sleep(delay)
await self.gather_meterings() # perfom data gathering
What I'm trying to achieve is so that when app starts it also adds this daemon function into the main event loop and execute it when it has time. However, I haven't been able to find a suitable solution which is adequate to the scale of the task (adding Celery and similar stuff is an overkill).
I have tried following ways to achieve this but none of them worked:
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
await Gatherer().start_metering_daemon()
Result: server can't start up since the thread is blocked
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(Gatherer().start_metering_daemon)
Result: task is never executed as observed in logs
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
fastapi.BackgroundTasks().add_task(asyncio.run, Gatherer().start_metering_daemon())
Result: same as previous one
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
threading.Thread(target=asyncio.run, args=(Gatherer().start_metering_daemon(),)).start()
Result: this one works but a) makes no sence; b) spawns N identical threads for N Uvicorn workers which all write same data N times into the DB.
I am out of solutions by now. I am pretty sure there must be a solution to my problem since is looks pretty trivial to me but I couldn't find one.
If you want more context here is the repo of the project I reffer to.
Solution
try
@app.on_event("startup")
async def startup_event() -> None:
"""tasks to do at server startup"""
asyncio.create_task(Gatherer().start_metering_daemon())
Answered By - Daniil Trotsenko
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.