Issue
I've been using Celery for a while a now, in production I use RabbitMQ as the broker and Redis for the backend in a K8s cluster with no problems so far. Locally, I run a docker compose with a few services (Flask API, 2 different Workers, Beat, Redis, Flower, Hasura), using Redis as both the Broker and the Backend.
I haven't experienced problems with this setup for the past months, but yesterday I started getting erratic behavior while accessing task results.
Tasks are sent to queue, the worker recognizes it and performs the task, but while querying for the task state I sometimes get DisabledBackend
. Normally on the first request, and then it works. Couldn't find a pattern of when it works and when it doesn't, it's erratic.
I've read somewhere that Celery didn't work very well with flask's builtin server so I switched to uWSGI with pretty much the same setup I have in production:
[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi
I've seen a similar question in Django in which the problem seemed to be on WSGI Mod with Apache, which is not my case, but the behavior seems similar. Every other question I've seen was related to misconfiguration of the backend, which is not my case.
Any ideas on what might be causing this? Thanks.
Solution
So it seems that I need to access AsyncResult
only via my Celery app instance, instead of through Celery, or pass the Celery app instance as an argument.
So, this doesn't work:
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id)
return task.state
This works:
from app import my_celery # Your own Celery Application Instance
@app.route('/status/<task_id>')
def get_status(task_id):
task = my_celery.AsyncResult(task_id)
return task.state
This also works:
from app import my_celery
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id, app=my_celery)
return task.state
I'm guessing what happens is that by calling AsyncResult
directly from Celery, it doesn't access Celery's configurations, hence it thinks that there's no backend configured to query results to.
But that would only explain complete failure of the function, and not the erratic behavior. I'm guessing this is because of different threads, and situations in which the app instance is being importante, so Celery finds it, not too sure though.
I've ran a couple of tests and seems to be working fine again after changing the imported AsyncResult
, but I'll keep digging.
Answered By - lowercase00
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.