Issue
I have a celery project connected to a MySQL databases. One of the tables is defined like this:
class MyQueues(Base):
__tablename__ = 'accepted_queues'
id = sa.Column(sa.Integer, primary_key=True)
customer = sa.Column(sa.String(length=50), nullable=False)
accepted = sa.Column(sa.Boolean, default=True, nullable=False)
denied = sa.Column(sa.Boolean, default=True, nullable=False)
Also, in the settings I have
THREADS = 4
And I am stuck in a function in code.py
:
def load_accepted_queues(session, mode=None):
#make query
pool = session.query(MyQueues.customer, MyQueues.accepted, MyQueues.denied)
#filter conditions
if (mode == 'XXX'):
pool = pool.filter_by(accepted=1)
elif (mode == 'YYY'):
pool = pool.filter_by(denied=1)
elif (mode is None):
pool = pool.filter(\
sa.or_(MyQueues.accepted == 1, MyQueues.denied == 1)
)
#generate a dictionary with data
for i in pool: #<---------- line 90 in the error
l.update({i.customer: {'customer': i.customer, 'accepted': i.accepted, 'denied': i.denied}})
When running this I get an error:
[20130626 115343] Traceback (most recent call last):
File "/home/me/code/processing/helpers.py", line 129, in wrapper
ret_value = func(session, *args, **kwargs)
File "/home/me/code/processing/test.py", line 90, in load_accepted_queues
for i in pool: #generate a dictionary with data
File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2341, in instances
fetch = cursor.fetchall()
File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3205, in fetchall
l = self.process_rows(self._fetchall_impl())
File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3174, in _fetchall_impl
self._non_result()
File "/home/me/envs/me/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 3179, in _non_result
"This result object does not return rows. "
ResourceClosedError: This result object does not return rows. It has been closed automatically
So mainly it is the part
ResourceClosedError: This result object does not return rows. It has been closed automatically
and sometimes also this error:
DBAPIError: (Error) (, AssertionError('Result length not requested length:\nExpected=1. Actual=0. Position: 21. Data Length: 21',)) 'SELECT accepted_queues.customer AS accepted_queues_customer, accepted_queues.accepted AS accepted_queues_accepted, accepted_queues.denied AS accepted_queues_denied \nFROM accepted_queues \nWHERE accepted_queues.accepted = %s OR accepted_queues.denied = %s' (1, 1)
I cannot reproduce the errror properly as it normally happens when processing a lot of data. I tried to change THREADS = 4
to 1
and errors disappeared. Anyway, it is not a solution as I need the number of threads to be kept on 4
.
Also, I am confused about the need to use
for i in pool: #<---------- line 90 in the error
or
for i in pool.all(): #<---------- line 90 in the error
and could not find a proper explanation of it.
All together: any advise to skip these difficulties?
Solution
All together: any advise to skip these difficulties?
yes. you absolutely cannot use a Session (or any objects which are associated with that Session), or a Connection, in more than one thread simultaneously, especially with MySQL-Python whose DBAPI connections are very thread-unsafe*. You must organize your application such that each thread deals with it's own, dedicated MySQL-Python connection (and therefore SQLAlchemy Connection/ Session / objects associated with that Session) with no leakage to any other thread.
- Edit: alternatively, you can make use of mutexes to limit access to the Session/Connection/DBAPI connection to just one of those threads at a time, though this is less common because the high degree of locking needed tends to defeat the purpose of using multiple threads in the first place.
Answered By - zzzeek
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.