Issue
I have a python flask backend app. I want to create a mechanism that can let some jobs run periodic on the background, like auto-update something every 1 minute.
But I have been blocked by some errors that made the backend didn't behave as expected.
view_func.py
from flask import Blueprint, request
from async_periodic_job import AsyncPeriodictUtils
infras_bp = Blueprint("infras", __name__)
@infras_bp.route("/infras/autosync", methods=["PUT"])
def auto_sync():
args = request.args
check_required_params(args, ["autosnyc"])
autosnyc = args.get("autosnyc", default="false").lower() == "true"
if autosnyc:
AsyncPeriodictUtils.start_task()
else:
AsyncPeriodictUtils.stop_task()
return "success", 200
async_periodic_job.py
import asyncio
from typing import Callable
from utils import logger
SECOND = 1
MINUTE = 60 * SECOND
logger.debug(f"Import {__file__}")
periodic_jobs = dict()
task_instance = None
# Get default event loop in main thread
loop = asyncio.get_event_loop()
class AsyncPeriodictUtils:
@staticmethod
async def run_jobs() -> None:
while True:
await asyncio.sleep(10)
logger.info(f"Called run_jobs periodicly.")
logger.info(f"periodic_jobs: {periodic_jobs.keys()}")
for func_name, function in periodic_jobs.items():
function()
logger.info(f"Called function '{func_name}' periodicly.")
@classmethod
def create_task(cls) -> None:
global task_instance, loop
task_instance = loop.create_task(cls.run_jobs())
@staticmethod
async def cancel_task() -> None:
global task_instance
if task_instance:
task_instance.cancel()
try:
await task_instance
except asyncio.CancelledError:
logger.info("Async periodic task has been cancelled.")
task_instance = None
else:
logger.warning("Async periodic task has not been started yet.")
@classmethod
def start_task(cls) -> None:
cls.create_task()
global loop
try:
loop.run_until_complete(task_instance)
except asyncio.CancelledError:
pass
logger.info("Async Periodic jobs launched.")
@classmethod
def stop_task(cls) -> None:
global loop
try:
loop.run_until_complete(cls.cancel_task())
except asyncio.CancelledError:
pass
logger.info("Async Periodic jobs terminated.")
@classmethod
def add_job(cls, function: Callable) -> None:
if function.__name__ in periodic_jobs:
return
periodic_jobs.update({function.__name__: function})
logger.info(f"Added function {function.__name__} in periodic jobs.")
global task_instance
if not task_instance:
logger.info(f"Auto enable periodic jobs.")
cls.start_task()
@classmethod
def remove_job(cls, function: Callable) -> None:
if function.__name__ not in periodic_jobs:
logger.warning(
f"function {function.__name__} not in periodic jobs.")
return
periodic_jobs.pop(function.__name__)
logger.info(f"Removed function {function.__name__} in periodic jobs.")
if not periodic_jobs:
logger.info(f"Periodic jobs list clear, auto terminate.")
cls.stop_task()
When I called AsyncPeriodictUtils.start_task()
My backend will raise an error and response 500, log shown below:
[2023-10-13-06:06:19][DAAT][ERROR][__init__.py] [Error] stack:
[2023-10-13-06:06:19][DAAT][ERROR][__init__.py] Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1820, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/app/routes/infras.py", line 134, in auto_sync
AsyncPeriodictUtils.start_task()
File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/infras/async_periodic_job.py", line 53, in start_task
loop.run_until_complete(task_instance)
File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
self._check_running()
File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
[2023-10-13-06:06:19][werkzeug][INFO][_internal.py] 172.23.0.3 - - [13/Oct/2023 06:06:19] "PUT /infras/autosync?autosnyc=true HTTP/1.1" 500
But after above step happened, the periodic jobs seem to be launched successfully, logs shown below:
[2023-10-13-06:06:23][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:23][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:33][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
When I called AsyncPeriodictUtils.stop_task()
My backend will raise an error and response 500, log shown below:
[2023-10-13-06:06:35][DAAT][ERROR][__init__.py] [Error] stack:
[2023-10-13-06:06:35][DAAT][ERROR][__init__.py] Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1820, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/conda/lib/python3.10/site-packages/flask/app.py", line 1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/app/routes/infras.py", line 136, in auto_sync
AsyncPeriodictUtils.stop_task()
File "/opt/conda/lib/python3.10/site-packages/daat-1.0.0-py3.10.egg/daat/infras/async_periodic_job.py", line 62, in stop_task
loop.run_until_complete(cls.cancel_task())
File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
self._check_running()
File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
/opt/conda/lib/python3.10/site-packages/flask/app.py:1822: RuntimeWarning: coroutine 'AsyncPeriodictUtils.cancel_task' was never awaited
rv = self.handle_user_exception(e)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
[2023-10-13-06:06:35][werkzeug][INFO][_internal.py] 172.23.0.3 - - [13/Oct/2023 06:06:35] "PUT /infras/autosync?autosnyc=false HTTP/1.1" 500 -
But this didn't terminate the periodic task on the background as expected, the periodic task is still running periodically:
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:43][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:06:53][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:03][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
[2023-10-13-06:07:13][DAAT][INFO][async_periodic_job.py] Called run_jobs periodicly.
[2023-10-13-06:07:13][DAAT][INFO][async_periodic_job.py] periodic_jobs: dict_keys([])
I've struggled trying different kinds of possibility but still coundn't figure it out.
Could anyone please check my code and point out where should I change or provide any guide / direction ? Thanks
I expect:
AsyncPeriodictUtils.start_task()
: Launch periodic task without raising exception that make the backend response 500.
AsyncPeriodictUtils.stop_task()
: Terminate periodic task without raising exception that make the backend response 500.
Solution
You might need to create another loop to perform this extra task or add the task to the current loop. Refer to this answer for more details.
However, IMO these periodic tasks should be executed in cronjob (server side) or setinterval
(JS). The core idea is to leave the server as it is and call the HTTP API from the server or client, as per your needs. This should not mix your current backend framework with unrelated works other than handling HTTP requests.
Answered By - thekingofcity
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.