Issue
I already have a mysql connection from Flask like this:
app.config['MYSQL_HOST'] = 'one.hostname.net'
app.config['MYSQL_USER'] = 'my_username'
app.config['MYSQL_PASSWORD'] = 'my_password'
app.config['MYSQL_DB'] = 'user_mydb'
mysql = MySQL(app)
with this setup I am able to use mysql database connection in flask. but when it comes to celery task, which is inside the same python file as flask.
@mycelery.task(bind=True, name='mytask')
def mytask(self, userid, port):
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute('SELECT * FROM mytable WHERE id = %s', (userid,))
It throws me an error saying
cursor = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
AttributeError: 'NoneType' object has no attribute 'cursor'
I understand it is because the celery has no MySQL connection made. But How can I establish the connection? so that I don't have to connect with the MySQL server whenever I create a task just like how it is don't for flask where we already established the connection by MySQL = MySQL(app)
??
Here is my celery setup, if it is helpful to add something to this code
mycelery = Celery(app.name)
mycelery.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': 'app/broker/out',
'data_folder_out': 'app/broker/out',
'data_folder_processed': 'app/broker/processed'
},
'result_persistent': False,
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
Solution
Firstly if you want to do dabatabase operation you can do in celery task, but you dont have to connect db with celery. You can connect flask with db, and install celery in your project and make db operation in your celery task.
Sample:
app.py
from flask import Flask
from flaskext.mysql import MySQL
app = Flask(__name__)
mysql = MySQL()
app.config['MYSQL_DATABASE_USER'] = ''
app.config['MYSQL_DATABASE_PASSWORD'] = ''
app.config['MYSQL_DATABASE_DB'] = ''
app.config['MYSQL_DATABASE_HOST'] = ''
mysql.init_app(app)
tasks.py
@celery.task
def db_connect_things():
conn = mysql.connect()
cursor =conn.cursor()
sql_query = """select from where """
cursor.execute(sql_query)
...
celery_config.py
from celery import Celery
celery = Celery(__name__)
celery = Celery('tasks', broker=) # rabbit,redis, ..
celery.conf.update({'CELERY_ACCEPT_CONTENT': ['pickle', 'json', 'msgpack', 'yaml']})
celery.conf.add_defaults(...)
celery.conf.update(CELERYBEAT_SCHEDULE={
'db_connect_things': {
'task': 'application.lib.tasks.db_connect_things',
'schedule': crontab(minute=0, hour='*/12'),
}})
class ContextTask(celery.Task):
...
Answered By - Ayse
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.