flask_sqlalchemy是如何在多线程中对数据库操作不相互影响
数据库操作隔离
结论:使用scoped_session实现数据库操作隔离
flask的api.route()接收一个请求,就会创建一个新的线程去处理,请求之间不相互影响
flask_sqlalchemy是如何使用db.session使多个请求中保函的改变同一个表的sql操作不相互影响的
在flask_sqlalchemy.SQLAlchemy类中关于session的定义:
# Which stack should we use? _app_ctx_stack is new in 0.9 connection_stack = _app_ctx_stack or _request_ctx_stack def __init__(self, app=None, use_native_unicode=True, session_options=None): session_options.setdefault( 'scopefunc', connection_stack.__ident_func__ ) self.session = self.create_scoped_session(session_options) def create_scoped_session(self, options=None): """Helper factory method that creates a scoped session.""" if options is None: options = {} scopefunc=options.pop('scopefunc', None) return orm.scoped_session( partial(_SignallingSession, self, **options), scopefunc=scopefunc )
self.session = self.create_scoped_session(session_options) 以及最后返回的
return orm.scoped_session(self.create_session(options), scopefunc=scopefunc)
可以看到使用的是sqlalchemy.orm.scoped_session
线程安全:scoped_session
结论:scoped_session会为每一个请求创建独立的session, 由线程id或者_app_ctx_stack.__ident_func__为标记
sqlalchemy的session对象
from sqlalchemy.orm import sessionmaker session = sessionmaker()
一般我们会通过sessionmaker()这个工厂函数创建session,但这个session并不能用在多线程中,为了支持多线程
操作,sqlalchemy提供了scoped_session,通过名字反映出scoped_session是通过某个作用域实现的
所以在多线程中一般都是如下使用session
from sqlalchemy.orm import scoped_session, sessionmaker session = scoped_session(sessionmaker())
我们来看看scoped_session是如何提供多线程环境支持的
class scoped_session(object): def __init__(self, session_factory, scopefunc=None): self.session_factory = session_factory if scopefunc: self.registry = ScopedRegistry(session_factory, scopefunc) else: self.registry = ThreadLocalRegistry(session_factory)
__init__中,session_factory是创建session的工厂函数,而sessionmaker就是一工厂函数(其实是定义了__call__的
函数)而scopefunc就是能产生某个作用域的函数,如果不提供将使用ThreadLocalRegistry
class ThreadLocalRegistry(ScopedRegistry): def __init__(self, createfunc): self.createfunc = createfunc self.registry = threading.local() def __call__(self): try: return self.registry.value except AttributeError: val = self.registry.v
从上面__call__可以看出,每次都会创建新的session,并发在线程本地变量中,你可能会好奇__call__是在哪里调用的?
def instrument(name): def do(self, *args, **kwargs): return getattr(self.registry(), name)(*args, **kwargs) return do
for meth in Session.public_methods:
setattr(scoped_session, meth, instrument(meth))
正如我们所看到的,当我们调用session.query将会调用 getattr(self.registry(), 'query'),self.registry()就是
调用__call__的时机,但是在flask_sqlalchemy中并没有使用ThreadLocalRegistry,创建scoped_session过程如下
# Which stack should we use? _app_ctx_stack is new in 0.9 connection_stack = _app_ctx_stack or _request_ctx_stack def __init__(self, app=None, use_native_unicode=True, session_options=None): session_options.setdefault( 'scopefunc', connection_stack.__ident_func__ ) self.session = self.create_scoped_session(session_options) def create_scoped_session(self, options=None): """Helper factory method that creates a scoped session.""" if options is None: options = {} scopefunc=options.pop('scopefunc', None) return orm.scoped_session( partial(_SignallingSession, self, **options), scopefunc=scopefunc )
我们看到scopefunc被设置为connection_stack.__ident_func__,而connection_stack就是flask中app上下文,
__ident_func__其实就是在多线程中就是thrading.get_ident,也就是线程id
我们看看ScopedRegistry是如何通过_操作的
class ScopedRegistry(object): def __init__(self, createfunc, scopefunc): self.createfunc = createfunc self.scopefunc = scopefunc self.registry = {} def __call__(self): key = self.scopefunc() try: return self.registry[key] except KeyError: return self.registry.setdefault(key, self.createfunc())
代码也很简单,其实也就是根据线程id创建对应的session对象,到这里我们基本已经了解了flask_sqlalchemy的线程安全原理。
1.flask_sqlalchemy能否使用ThreadLocalRegistry?
大部分情况都是可以的,但如果wsgi对多并发使用的是greenlet的模式就不适用了
2.上面create_scoped_session中partial是干嘛的?
前面我们说过scoped_session的session_factory是可调用对象,但_SignallingSession类并没有定义__call__,所以通过partial支持
这里说一下对db.relationship lazy的理解,看如下代码
class Role(db.Model): __tablename__ = 'roles' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) users = db.relationship('User', backref='role', lazy='dynamic') class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
假设role是已经获取的一个Role的实例
lazy:dynamic => role.users不会返回User的列表, 返回的是sqlalchemy.orm.dynamic.AppenderBaseQuery对象
当执行role.users.all()是才会真正执行sql,这样的好处就是可以继续过滤
lazy:select => role.users直接返回User实例的列表,也就是直接执行sql
注意:db.session.commit只有在对象有变化时才会真的执行update
两个比较重要的配置
app.config['SQLALCHEMY_ECHO'] = True =》配置输出sql语句
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True =》每次request自动提交db.session.commit()
这是通过app.teardown_appcontext注册实现
# 绑定app然后初始化sql配置 if app is not None: self.init_app(app) # 使用钩子,当请求结束后若没有配置自动提交,则移除此session @teardown def shutdown_session(response_or_exc): if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']: if response_or_exc is None: self.session.commit() self.session.remove() return response_or_exc # sqlalchemy.orm.scoping.scoped_session # sqlalchemy.util._collections.ScopedRegistry 定义 def clear(self): #Clear the current scope, if any. try: del self.registry[self.scopefunc()] except KeyError: pass
response_or_exc为异常值,默认为sys.exc_info()[1]
上面self.session.remove()表示每次请求后都会销毁self.session,不然会导致存放session的字段太大。