zoukankan      html  css  js  c++  java
  • Flask-SqlAchemy源码之session详解解决连接gone away问题


    • 线上连接gone away问题

    我们业务中需要执行一个时间超过8小时的脚本,然后发生了连接gone away的问题,然后开始找原因,首先我们的伪代码如下:

    app = create_app()
    def prod_script():
        with app.app_context():
            machine_objs = DiggerMapper.get_all()
            time.sleep(8*3600)   # 替代线上执行比较久的代码
            machine_objs = DiggerMapper.get_all()   # 再执行这次数据库查询时发生连接gone away的报错
        return True
    

    通过排查原因基本定位为,在执行代码的时候本次连接时间比较久,超过了mysql的默认最长空闲连接时间8小时就被断开了,所以再次进行数据库查询时就报错连接gone away。

    • 解决方案

    ok,知道了错误发生的原因,我们的基本思路就是,给每次查询结束以后我们就把数据库连接放回连接池中,然后下次要使用时再次从连接池中取连接就好了。代码如下:

    app = create_app()
    def prod_script():
        with app.app_context():
            machine_objs = DiggerMapper.get_all()
            db.session.close()
            time.sleep(8*3600)   # 替代线上执行比较久的代码
            machine_objs = DiggerMapper.get_all()  
        return True
    

    那么这样就解决问题了吗?我们知道mysql在空闲连接超过8小时后会主动断开连接,所以连接池中的空闲连接在超过8小时后也是会被mysql断开的,那么下次进行数据库操作时依然会继续报错,我们还需要加上一条设置放在config文件中的SQLALCHEMY_POOL_RECYCLE = 3600 这条设置的意思是让连接池每隔一小时重新和mysql建立连接,这样我们取到连接都是可用连接了。

    通过以上两步就基本上可以解决连接gone away问题了。

    那么都到这一步了, 知其然要知其所以然,都是这个session在进行操作,那么这个session是怎么工作的呢?我们去找找它的源码。

    • session源码

    我们在创建app前通常会先实例化SQLAlchemy,然后执行init_app(app),那么找session我们先看它实例化的代码做了什么工作:

    db = SQLAlchemy()
    
    def create_app():
       app = Flask(__name__)
       db.init_app(app)
       return app
    
    
    class SQLAlchemy(object):
        
        def __init__(self, app=None, use_native_unicode=True, session_options=None,
                     metadata=None, query_class=BaseQuery, model_class=Model,
                     engine_options=None):
    
            self.use_native_unicode = use_native_unicode
            self.Query = query_class
            self.session = self.create_scoped_session(session_options)
            self.Model = self.make_declarative_base(model_class, metadata)
            self._engine_lock = Lock()
            self.app = app
            self._engine_options = engine_options or {}
            _include_sqlalchemy(self, query_class)
    
            if app is not None:
                self.init_app(app)
    

    我们看到session是create_scoped_session(session_options)生成的,然后进入这个方法看到:

        def create_scoped_session(self, options=None):
    
            if options is None:
                options = {}
    
            scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
            options.setdefault('query_cls', self.Query)
            return orm.scoped_session(
                self.create_session(options), scopefunc=scopefunc
            )
    

    上面看到相当于session是由 orm.scoped_session方法生成的,其中self.create_session(options)方法在被调用时默认生成一个SignallingSession对象,我们再看看orm.scoped_session方法发生了什么:

    class scoped_session(object):
        def __init__(self, session_factory, scopefunc=None):
    
            self.session_factory = session_factory
    
            if scopefunc:
                self.registry = ScopedRegistry(session_factory, scopefunc)
            else:
                self.registry = ThreadLocalRegistry(session_factory)
    
     # self.registry 类似如下
     self.registry = {"线程id": SignallingSession, "线程id2": SignallingSession}
    

    好吧,scoped_session是一个类,实例化以后有个属性self.registry,如果之前有看过flask源码知道像请求上下文和应用上下文一样,它是一个基于线程安全的字典,每一个线程有自己的session类。

    这是你可能有个疑问既然这里返回的是一个scoped_session对象,但是类中并没有query等方法那么db.session.query(), db.session.close()这些是怎么来的,答案就是在文件下还有一段立即执行的代码如下:

    ScopedSession = scoped_session
    """Old name for backwards compatibility."""
    
    
    def instrument(name):
        def do(self, *args, **kwargs):
            return getattr(self.registry(), name)(*args, **kwargs)
    
        return do
    
    
    for meth in Session.public_methods:
        setattr(scoped_session, meth, instrument(meth))
    

    这段代码的意思就是把SqlAchemy的Session类中有的所有方法,也给上面我们提到默认SignallingSession对象加上,这样我们就可以调用query等方法了。并且也相当于每次执行数据库操作时都会创建或者共用之前创建的session连接,

    然后我们在代码中从来没有主动关闭连接的代码,flask-sqlachemy是怎么帮我们关闭连接的呢?答案是它里面在请求完成后主动帮我们关闭了,在init_app的代码中有如下代码:

        def init_app(self, app):
            """This callback can be used to initialize an application for the
            use with this database setup.  Never use a database in the context
            of an application not initialized that way or connections will
            leak.
            """
            if (
                'SQLALCHEMY_DATABASE_URI' not in app.config and
                'SQLALCHEMY_BINDS' not in app.config
            ):
                warnings.warn(
                    'Neither SQLALCHEMY_DATABASE_URI nor SQLALCHEMY_BINDS is set. '
                    'Defaulting SQLALCHEMY_DATABASE_URI to "sqlite:///:memory:".'
                )
    
            app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
            app.config.setdefault('SQLALCHEMY_BINDS', None)
            app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None)
            app.config.setdefault('SQLALCHEMY_ECHO', False)
            app.config.setdefault('SQLALCHEMY_RECORD_QUERIES', None)
            app.config.setdefault('SQLALCHEMY_POOL_SIZE', None)
            app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None)
            app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', None)
            app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None)
            app.config.setdefault('SQLALCHEMY_COMMIT_ON_TEARDOWN', False)
            track_modifications = app.config.setdefault(
                'SQLALCHEMY_TRACK_MODIFICATIONS', None
            )
            app.config.setdefault('SQLALCHEMY_ENGINE_OPTIONS', {})
    
            if track_modifications is None:
                warnings.warn(FSADeprecationWarning(
                    'SQLALCHEMY_TRACK_MODIFICATIONS adds significant overhead and '
                    'will be disabled by default in the future.  Set it to True '
                    'or False to suppress this warning.'
                ))
    
            # Deprecation warnings for config keys that should be replaced by SQLALCHEMY_ENGINE_OPTIONS.
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_SIZE', 'pool_size')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_TIMEOUT', 'pool_timeout')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_RECYCLE', 'pool_recycle')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_MAX_OVERFLOW', 'max_overflow')
    
            app.extensions['sqlalchemy'] = _SQLAlchemyState(self)
    
            @app.teardown_appcontext
            def shutdown_session(response_or_exc):
                if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                    warnings.warn(
                        "'COMMIT_ON_TEARDOWN' is deprecated and will be"
                        " removed in version 3.1. Call"
                        " 'db.session.commit()'` directly instead.",
                        DeprecationWarning,
                    )
    
                    if response_or_exc is None:
                        self.session.commit()
    
                self.session.remove()
                return response_or_exc
    

    其中shutdown_session方法就是在请求结束后,主动帮我们关闭session连接放回连接池中。

    • 总结

    综上源码所述,解释了

    • 为什么我们可以随时关闭session连接,因为每次数据库操作都可以重新创建连接
    • 为什么设置SQLALCHEMY_POOL_RECYCLE 配置,因为让连接保持活跃不被mysql断开
    • 遇到执行比较久的代码怎么做,在适当的时候我们主动关闭session连接放回连接池中保证连接的可用
    公众号 种树飞编程 欢迎关注
  • 相关阅读:
    工厂模式 ioc dom4j 反射之我的一点理解
    hibernate中注解方式中的控制反转
    java中的数据存储(堆,栈) 很重要
    hibernate中映射关系总结
    三极管使用方法
    OC OD介绍
    HP Jack介绍
    Jlink接口的Jtag和SWD接口定义
    什么是域什么是工作组
    Ubuntu安装.run文件
  • 原文地址:https://www.cnblogs.com/lifei01/p/14598930.html
Copyright © 2011-2022 走看看