zoukankan      html  css  js  c++  java
  • flask_sqlalchemy的session线程安全源码解读

    flask_sqlalchemy是如何在多线程中对数据库操作不相互影响

    数据库操作隔离

    结论:使用scoped_session实现数据库操作隔离

    flask的api.route()接收一个请求,就会创建一个新的线程去处理,请求之间不相互影响

    flask_sqlalchemy是如何使用db.session使多个请求中保函的改变同一个表的sql操作不相互影响的

    在flask_sqlalchemy.SQLAlchemy类中关于session的定义:

    # Which stack should we use?  _app_ctx_stack is new in 0.9
    
    connection_stack = _app_ctx_stack or _request_ctx_stack
     
        def __init__(self, app=None,
                     use_native_unicode=True,
                     session_options=None):
            session_options.setdefault(
                'scopefunc', connection_stack.__ident_func__
            )
            self.session = self.create_scoped_session(session_options)
     
        def create_scoped_session(self, options=None):
            """Helper factory method that creates a scoped session."""
            if options is None:
                options = {}
            scopefunc=options.pop('scopefunc', None)
            return orm.scoped_session(
                partial(_SignallingSession, self, **options), scopefunc=scopefunc
            )

    self.session = self.create_scoped_session(session_options) 以及最后返回的

    return orm.scoped_session(self.create_session(options), scopefunc=scopefunc)

    可以看到使用的是sqlalchemy.orm.scoped_session

    线程安全:scoped_session
    结论:scoped_session会为每一个请求创建独立的session, 由线程id或者_app_ctx_stack.__ident_func__为标记

    sqlalchemy的session对象
     

    from sqlalchemy.orm import sessionmaker
    session = sessionmaker()

    一般我们会通过sessionmaker()这个工厂函数创建session,但这个session并不能用在多线程中,为了支持多线程
    操作,sqlalchemy提供了scoped_session,通过名字反映出scoped_session是通过某个作用域实现的
    所以在多线程中一般都是如下使用session

    from sqlalchemy.orm import scoped_session, sessionmaker
    session = scoped_session(sessionmaker())

    我们来看看scoped_session是如何提供多线程环境支持的

    class scoped_session(object):
        def __init__(self, session_factory, scopefunc=None):
            
            self.session_factory = session_factory
            if scopefunc:
                self.registry = ScopedRegistry(session_factory, scopefunc)
            else:
                self.registry = ThreadLocalRegistry(session_factory)

    __init__中,session_factory是创建session的工厂函数,而sessionmaker就是一工厂函数(其实是定义了__call__的

    函数)而scopefunc就是能产生某个作用域的函数,如果不提供将使用ThreadLocalRegistry

    class ThreadLocalRegistry(ScopedRegistry):
        def __init__(self, createfunc):
            self.createfunc = createfunc
            self.registry = threading.local()
     
        def __call__(self):
            try:
                return self.registry.value
            except AttributeError:
                val = self.registry.v

    从上面__call__可以看出,每次都会创建新的session,并发在线程本地变量中,你可能会好奇__call__是在哪里调用的?

    def instrument(name):
        def do(self, *args, **kwargs):
            return getattr(self.registry(), name)(*args, **kwargs)
        return do


     
    for meth in Session.public_methods:
        setattr(scoped_session, meth, instrument(meth))

    正如我们所看到的,当我们调用session.query将会调用 getattr(self.registry(), 'query'),self.registry()就是
    调用__call__的时机,但是在flask_sqlalchemy中并没有使用ThreadLocalRegistry,创建scoped_session过程如下

    # Which stack should we use?  _app_ctx_stack is new in 0.9
    connection_stack = _app_ctx_stack or _request_ctx_stack
     
        def __init__(self, app=None,
                     use_native_unicode=True,
                     session_options=None):
            session_options.setdefault(
                'scopefunc', connection_stack.__ident_func__
            )
            self.session = self.create_scoped_session(session_options)
     
        def create_scoped_session(self, options=None):
            """Helper factory method that creates a scoped session."""
            if options is None:
                options = {}
            scopefunc=options.pop('scopefunc', None)
            return orm.scoped_session(
                partial(_SignallingSession, self, **options), scopefunc=scopefunc
            )

    我们看到scopefunc被设置为connection_stack.__ident_func__,而connection_stack就是flask中app上下文,
    __ident_func__其实就是在多线程中就是thrading.get_ident,也就是线程id
    我们看看ScopedRegistry是如何通过_操作的

    class ScopedRegistry(object):
        def __init__(self, createfunc, scopefunc):
            self.createfunc = createfunc
            self.scopefunc = scopefunc
            self.registry = {}
     
     
        def __call__(self):
            key = self.scopefunc()
            try:
                return self.registry[key]
            except KeyError:
                return self.registry.setdefault(key, self.createfunc())

    代码也很简单,其实也就是根据线程id创建对应的session对象,到这里我们基本已经了解了flask_sqlalchemy的线程安全原理。
    1.flask_sqlalchemy能否使用ThreadLocalRegistry?
        大部分情况都是可以的,但如果wsgi对多并发使用的是greenlet的模式就不适用了
    2.上面create_scoped_session中partial是干嘛的?
        前面我们说过scoped_session的session_factory是可调用对象,但_SignallingSession类并没有定义__call__,所以通过partial支持

    这里说一下对db.relationship lazy的理解,看如下代码

    class Role(db.Model):
    __tablename__ = 'roles'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    users = db.relationship('User', backref='role', lazy='dynamic')
    
    
    class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True)
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

    假设role是已经获取的一个Role的实例
    lazy:dynamic => role.users不会返回User的列表, 返回的是sqlalchemy.orm.dynamic.AppenderBaseQuery对象
                    当执行role.users.all()是才会真正执行sql,这样的好处就是可以继续过滤

    lazy:select => role.users直接返回User实例的列表,也就是直接执行sql

    注意:db.session.commit只有在对象有变化时才会真的执行update

    两个比较重要的配置
    app.config['SQLALCHEMY_ECHO'] = True =》配置输出sql语句
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True =》每次request自动提交db.session.commit()
    这是通过app.teardown_appcontext注册实现

    # 绑定app然后初始化sql配置
    if app is not None:
        self.init_app(app)
        
    # 使用钩子,当请求结束后若没有配置自动提交,则移除此session 
    @teardown
            def shutdown_session(response_or_exc):
                if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                    if response_or_exc is None:
                        self.session.commit()
                self.session.remove()
                return response_or_exc
     
    # sqlalchemy.orm.scoping.scoped_session
    # sqlalchemy.util._collections.ScopedRegistry 定义
    def clear(self):
        #Clear the current scope, if any.
        try:
            del self.registry[self.scopefunc()]
        except KeyError:
            pass

    response_or_exc为异常值,默认为sys.exc_info()[1]
    上面self.session.remove()表示每次请求后都会销毁self.session,不然会导致存放session的字段太大。


    https://blog.csdn.net/luffyser/article/details/89380186

  • 相关阅读:
    【Codeforces 349B】Color the Fence
    【Codeforces 459D】Pashmak and Parmida's problem
    【Codeforces 467C】George and Job
    【Codeforces 161D】Distance in Tree
    【Codeforces 522A】Reposts
    【Codeforces 225C】Barcode
    【Codeforces 446A】DZY Loves Sequences
    【Codeforces 429B】Working out
    【Codeforces 478C】Table Decorations
    【Codeforces 478C】Table Decorations
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/4200340.html
Copyright © 2011-2022 走看看