zoukankan      html  css  js  c++  java
  • flask_sqlalchemy的session线程安全源码解读

    flask_sqlalchemy是如何在多线程中对数据库操作不相互影响

    数据库操作隔离

    结论:使用scoped_session实现数据库操作隔离

    flask的api.route()接收一个请求,就会创建一个新的线程去处理,请求之间不相互影响

    flask_sqlalchemy是如何使用db.session使多个请求中保函的改变同一个表的sql操作不相互影响的

    在flask_sqlalchemy.SQLAlchemy类中关于session的定义:

    # Which stack should we use?  _app_ctx_stack is new in 0.9
    
    connection_stack = _app_ctx_stack or _request_ctx_stack
     
        def __init__(self, app=None,
                     use_native_unicode=True,
                     session_options=None):
            session_options.setdefault(
                'scopefunc', connection_stack.__ident_func__
            )
            self.session = self.create_scoped_session(session_options)
     
        def create_scoped_session(self, options=None):
            """Helper factory method that creates a scoped session."""
            if options is None:
                options = {}
            scopefunc=options.pop('scopefunc', None)
            return orm.scoped_session(
                partial(_SignallingSession, self, **options), scopefunc=scopefunc
            )

    self.session = self.create_scoped_session(session_options) 以及最后返回的

    return orm.scoped_session(self.create_session(options), scopefunc=scopefunc)

    可以看到使用的是sqlalchemy.orm.scoped_session

    线程安全:scoped_session
    结论:scoped_session会为每一个请求创建独立的session, 由线程id或者_app_ctx_stack.__ident_func__为标记

    sqlalchemy的session对象
     

    from sqlalchemy.orm import sessionmaker
    session = sessionmaker()

    一般我们会通过sessionmaker()这个工厂函数创建session,但这个session并不能用在多线程中,为了支持多线程
    操作,sqlalchemy提供了scoped_session,通过名字反映出scoped_session是通过某个作用域实现的
    所以在多线程中一般都是如下使用session

    from sqlalchemy.orm import scoped_session, sessionmaker
    session = scoped_session(sessionmaker())

    我们来看看scoped_session是如何提供多线程环境支持的

    class scoped_session(object):
        def __init__(self, session_factory, scopefunc=None):
            
            self.session_factory = session_factory
            if scopefunc:
                self.registry = ScopedRegistry(session_factory, scopefunc)
            else:
                self.registry = ThreadLocalRegistry(session_factory)

    __init__中,session_factory是创建session的工厂函数,而sessionmaker就是一工厂函数(其实是定义了__call__的

    函数)而scopefunc就是能产生某个作用域的函数,如果不提供将使用ThreadLocalRegistry

    class ThreadLocalRegistry(ScopedRegistry):
        def __init__(self, createfunc):
            self.createfunc = createfunc
            self.registry = threading.local()
     
        def __call__(self):
            try:
                return self.registry.value
            except AttributeError:
                val = self.registry.v

    从上面__call__可以看出,每次都会创建新的session,并发在线程本地变量中,你可能会好奇__call__是在哪里调用的?

    def instrument(name):
        def do(self, *args, **kwargs):
            return getattr(self.registry(), name)(*args, **kwargs)
        return do


     
    for meth in Session.public_methods:
        setattr(scoped_session, meth, instrument(meth))

    正如我们所看到的,当我们调用session.query将会调用 getattr(self.registry(), 'query'),self.registry()就是
    调用__call__的时机,但是在flask_sqlalchemy中并没有使用ThreadLocalRegistry,创建scoped_session过程如下

    # Which stack should we use?  _app_ctx_stack is new in 0.9
    connection_stack = _app_ctx_stack or _request_ctx_stack
     
        def __init__(self, app=None,
                     use_native_unicode=True,
                     session_options=None):
            session_options.setdefault(
                'scopefunc', connection_stack.__ident_func__
            )
            self.session = self.create_scoped_session(session_options)
     
        def create_scoped_session(self, options=None):
            """Helper factory method that creates a scoped session."""
            if options is None:
                options = {}
            scopefunc=options.pop('scopefunc', None)
            return orm.scoped_session(
                partial(_SignallingSession, self, **options), scopefunc=scopefunc
            )

    我们看到scopefunc被设置为connection_stack.__ident_func__,而connection_stack就是flask中app上下文,
    __ident_func__其实就是在多线程中就是thrading.get_ident,也就是线程id
    我们看看ScopedRegistry是如何通过_操作的

    class ScopedRegistry(object):
        def __init__(self, createfunc, scopefunc):
            self.createfunc = createfunc
            self.scopefunc = scopefunc
            self.registry = {}
     
     
        def __call__(self):
            key = self.scopefunc()
            try:
                return self.registry[key]
            except KeyError:
                return self.registry.setdefault(key, self.createfunc())

    代码也很简单,其实也就是根据线程id创建对应的session对象,到这里我们基本已经了解了flask_sqlalchemy的线程安全原理。
    1.flask_sqlalchemy能否使用ThreadLocalRegistry?
        大部分情况都是可以的,但如果wsgi对多并发使用的是greenlet的模式就不适用了
    2.上面create_scoped_session中partial是干嘛的?
        前面我们说过scoped_session的session_factory是可调用对象,但_SignallingSession类并没有定义__call__,所以通过partial支持

    这里说一下对db.relationship lazy的理解,看如下代码

    class Role(db.Model):
    __tablename__ = 'roles'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    users = db.relationship('User', backref='role', lazy='dynamic')
    
    
    class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True)
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

    假设role是已经获取的一个Role的实例
    lazy:dynamic => role.users不会返回User的列表, 返回的是sqlalchemy.orm.dynamic.AppenderBaseQuery对象
                    当执行role.users.all()是才会真正执行sql,这样的好处就是可以继续过滤

    lazy:select => role.users直接返回User实例的列表,也就是直接执行sql

    注意:db.session.commit只有在对象有变化时才会真的执行update

    两个比较重要的配置
    app.config['SQLALCHEMY_ECHO'] = True =》配置输出sql语句
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True =》每次request自动提交db.session.commit()
    这是通过app.teardown_appcontext注册实现

    # 绑定app然后初始化sql配置
    if app is not None:
        self.init_app(app)
        
    # 使用钩子,当请求结束后若没有配置自动提交,则移除此session 
    @teardown
            def shutdown_session(response_or_exc):
                if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                    if response_or_exc is None:
                        self.session.commit()
                self.session.remove()
                return response_or_exc
     
    # sqlalchemy.orm.scoping.scoped_session
    # sqlalchemy.util._collections.ScopedRegistry 定义
    def clear(self):
        #Clear the current scope, if any.
        try:
            del self.registry[self.scopefunc()]
        except KeyError:
            pass

    response_or_exc为异常值,默认为sys.exc_info()[1]
    上面self.session.remove()表示每次请求后都会销毁self.session,不然会导致存放session的字段太大。


    https://blog.csdn.net/luffyser/article/details/89380186

  • 相关阅读:
    层次遍历二叉树时的一个技巧
    合并两个有序链表
    关于指针的引用和“||”运算符的一些心得
    UE4中显示AI Debug信息
    EQS 自定义Context 如何用Testing Pawn 进行测试?
    4.16中Montage的一些变化
    Move Controller UE4键位
    EQS
    获取文件完整路径快捷方法
    同步引擎版本号的简易方法
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/4200340.html
Copyright © 2011-2022 走看看