Issue
I recently updated SQLAlchemy (with [asyncio] package) to 1.4.46
and started to get the following exception when committing:
sqlalchemy.exc.IllegalStateChangeError: Method 'commit()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>
Before updating to the new version, it was working fine.
# -*- coding:utf-8 -*-
from sqlalchemy import exc, event, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.pool import NullPool, Pool
from contextvars import ContextVar
from sanic import Sanic
import asyncio
class EngineNotInitialisedError(Exception):
pass
class DBSessionContext:
def __init__(self, session: Session, commit_on_exit: bool = True) -> None:
self.session = session
self._query = None
self.commit_on_exit = commit_on_exit
self.token = None
async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._query:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._query.rollback()
self._post_processing.clear()
else:
await self._query.commit()
await self.run_post_processing()
await self._query.close()
if self._post_processing:
await self.run_post_processing()
def set_token(self, token):
self.token = token
@property
def query(self) -> Session:
if not self._query:
self._query = self.session()
return self._query
class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement, **parameters):
try:
if isinstance(statement, str):
# We wrap around the `text()` method automatically
statement = text(statement)
return await super().execute(statement, parameters)
except exc.OperationalError as e:
if e.orig.args[0] == 1205:
# Lock wait timeout exceeded
await self.rollback()
return await super().execute(statement, parameters)
raise e
class DBSession:
def __init__(self):
self.engine = None
self.session = None
self._session = None
self.context = ContextVar("context", default=None)
def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:
self.commit_on_exit = commit_on_exit
engine_args = {
'echo': app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:
# the number of connections to allow in connection pool “overflow”
# 'max_overflow': app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
# 'pool_pre_ping': app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to keep open inside the connection pool
# 'pool_size': app.config.get('DATABASE_POOL_SIZE', cast=int, default=5),
# this setting causes the pool to recycle connections after the given number of seconds has passed
# 'pool_recycle': app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=-1),
# number of seconds to wait before giving up on getting a connection from the pool
# 'pool_timeout': app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=3600),
}
self.engine = create_async_engine(
url,
**engine_args
)
self.session = sessionmaker(
bind=self.engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False
)
async def __aenter__(self):
if not isinstance(self.engine, AsyncEngine):
raise EngineNotInitialisedError
session_ctx = DBSessionContext(self.session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))
return session_ctx
async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))
self.context.reset(session_ctx.token)
@property
def query(self) -> Session:
return self.context.get().query
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise
cursor.close()
db = DBSession()
The code is called with the following :
async with db:
await db.query.execute('INSERT INTO ...')
What is causing the InvalidStateChangeError I'm having? How can I avoid this issue?
Solution
There is a discussion on the Github repository of SQLAlchemy, that gives a reason why the issue is occurring: https://github.com/sqlalchemy/sqlalchemy/discussions/9312
The suggestion is that the code is calling something like
asyncio.gather(func(session), func2(session)
with the two function sharing the same session, which causes the sqlalchemy.exc.IllegalStateChangeError
Removing the asyncio.gather
call resolve the issue. (Or use two sessions, one for each functions).
Answered By - Cyril N.
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.