zoukankan      html  css  js  c++  java
  • Flask-SqlAchemy源码之session详解解决连接gone away问题


    • 线上连接gone away问题

    我们业务中需要执行一个时间超过8小时的脚本,然后发生了连接gone away的问题,然后开始找原因,首先我们的伪代码如下:

    app = create_app()
    def prod_script():
        with app.app_context():
            machine_objs = DiggerMapper.get_all()
            time.sleep(8*3600)   # 替代线上执行比较久的代码
            machine_objs = DiggerMapper.get_all()   # 再执行这次数据库查询时发生连接gone away的报错
        return True
    

    通过排查原因基本定位为,在执行代码的时候本次连接时间比较久,超过了mysql的默认最长空闲连接时间8小时就被断开了,所以再次进行数据库查询时就报错连接gone away。

    • 解决方案

    ok,知道了错误发生的原因,我们的基本思路就是,给每次查询结束以后我们就把数据库连接放回连接池中,然后下次要使用时再次从连接池中取连接就好了。代码如下:

    app = create_app()
    def prod_script():
        with app.app_context():
            machine_objs = DiggerMapper.get_all()
            db.session.close()
            time.sleep(8*3600)   # 替代线上执行比较久的代码
            machine_objs = DiggerMapper.get_all()  
        return True
    

    那么这样就解决问题了吗?我们知道mysql在空闲连接超过8小时后会主动断开连接,所以连接池中的空闲连接在超过8小时后也是会被mysql断开的,那么下次进行数据库操作时依然会继续报错,我们还需要加上一条设置放在config文件中的SQLALCHEMY_POOL_RECYCLE = 3600 这条设置的意思是让连接池每隔一小时重新和mysql建立连接,这样我们取到连接都是可用连接了。

    通过以上两步就基本上可以解决连接gone away问题了。

    那么都到这一步了, 知其然要知其所以然,都是这个session在进行操作,那么这个session是怎么工作的呢?我们去找找它的源码。

    • session源码

    我们在创建app前通常会先实例化SQLAlchemy,然后执行init_app(app),那么找session我们先看它实例化的代码做了什么工作:

    db = SQLAlchemy()
    
    def create_app():
       app = Flask(__name__)
       db.init_app(app)
       return app
    
    
    class SQLAlchemy(object):
        
        def __init__(self, app=None, use_native_unicode=True, session_options=None,
                     metadata=None, query_class=BaseQuery, model_class=Model,
                     engine_options=None):
    
            self.use_native_unicode = use_native_unicode
            self.Query = query_class
            self.session = self.create_scoped_session(session_options)
            self.Model = self.make_declarative_base(model_class, metadata)
            self._engine_lock = Lock()
            self.app = app
            self._engine_options = engine_options or {}
            _include_sqlalchemy(self, query_class)
    
            if app is not None:
                self.init_app(app)
    

    我们看到session是create_scoped_session(session_options)生成的,然后进入这个方法看到:

        def create_scoped_session(self, options=None):
    
            if options is None:
                options = {}
    
            scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
            options.setdefault('query_cls', self.Query)
            return orm.scoped_session(
                self.create_session(options), scopefunc=scopefunc
            )
    

    上面看到相当于session是由 orm.scoped_session方法生成的,其中self.create_session(options)方法在被调用时默认生成一个SignallingSession对象,我们再看看orm.scoped_session方法发生了什么:

    class scoped_session(object):
        def __init__(self, session_factory, scopefunc=None):
    
            self.session_factory = session_factory
    
            if scopefunc:
                self.registry = ScopedRegistry(session_factory, scopefunc)
            else:
                self.registry = ThreadLocalRegistry(session_factory)
    
     # self.registry 类似如下
     self.registry = {"线程id": SignallingSession, "线程id2": SignallingSession}
    

    好吧,scoped_session是一个类,实例化以后有个属性self.registry,如果之前有看过flask源码知道像请求上下文和应用上下文一样,它是一个基于线程安全的字典,每一个线程有自己的session类。

    这是你可能有个疑问既然这里返回的是一个scoped_session对象,但是类中并没有query等方法那么db.session.query(), db.session.close()这些是怎么来的,答案就是在文件下还有一段立即执行的代码如下:

    ScopedSession = scoped_session
    """Old name for backwards compatibility."""
    
    
    def instrument(name):
        def do(self, *args, **kwargs):
            return getattr(self.registry(), name)(*args, **kwargs)
    
        return do
    
    
    for meth in Session.public_methods:
        setattr(scoped_session, meth, instrument(meth))
    

    这段代码的意思就是把SqlAchemy的Session类中有的所有方法,也给上面我们提到默认SignallingSession对象加上,这样我们就可以调用query等方法了。并且也相当于每次执行数据库操作时都会创建或者共用之前创建的session连接,

    然后我们在代码中从来没有主动关闭连接的代码,flask-sqlachemy是怎么帮我们关闭连接的呢?答案是它里面在请求完成后主动帮我们关闭了,在init_app的代码中有如下代码:

        def init_app(self, app):
            """This callback can be used to initialize an application for the
            use with this database setup.  Never use a database in the context
            of an application not initialized that way or connections will
            leak.
            """
            if (
                'SQLALCHEMY_DATABASE_URI' not in app.config and
                'SQLALCHEMY_BINDS' not in app.config
            ):
                warnings.warn(
                    'Neither SQLALCHEMY_DATABASE_URI nor SQLALCHEMY_BINDS is set. '
                    'Defaulting SQLALCHEMY_DATABASE_URI to "sqlite:///:memory:".'
                )
    
            app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
            app.config.setdefault('SQLALCHEMY_BINDS', None)
            app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None)
            app.config.setdefault('SQLALCHEMY_ECHO', False)
            app.config.setdefault('SQLALCHEMY_RECORD_QUERIES', None)
            app.config.setdefault('SQLALCHEMY_POOL_SIZE', None)
            app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None)
            app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', None)
            app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None)
            app.config.setdefault('SQLALCHEMY_COMMIT_ON_TEARDOWN', False)
            track_modifications = app.config.setdefault(
                'SQLALCHEMY_TRACK_MODIFICATIONS', None
            )
            app.config.setdefault('SQLALCHEMY_ENGINE_OPTIONS', {})
    
            if track_modifications is None:
                warnings.warn(FSADeprecationWarning(
                    'SQLALCHEMY_TRACK_MODIFICATIONS adds significant overhead and '
                    'will be disabled by default in the future.  Set it to True '
                    'or False to suppress this warning.'
                ))
    
            # Deprecation warnings for config keys that should be replaced by SQLALCHEMY_ENGINE_OPTIONS.
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_SIZE', 'pool_size')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_TIMEOUT', 'pool_timeout')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_POOL_RECYCLE', 'pool_recycle')
            utils.engine_config_warning(app.config, '3.0', 'SQLALCHEMY_MAX_OVERFLOW', 'max_overflow')
    
            app.extensions['sqlalchemy'] = _SQLAlchemyState(self)
    
            @app.teardown_appcontext
            def shutdown_session(response_or_exc):
                if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                    warnings.warn(
                        "'COMMIT_ON_TEARDOWN' is deprecated and will be"
                        " removed in version 3.1. Call"
                        " 'db.session.commit()'` directly instead.",
                        DeprecationWarning,
                    )
    
                    if response_or_exc is None:
                        self.session.commit()
    
                self.session.remove()
                return response_or_exc
    

    其中shutdown_session方法就是在请求结束后,主动帮我们关闭session连接放回连接池中。

    • 总结

    综上源码所述,解释了

    • 为什么我们可以随时关闭session连接,因为每次数据库操作都可以重新创建连接
    • 为什么设置SQLALCHEMY_POOL_RECYCLE 配置,因为让连接保持活跃不被mysql断开
    • 遇到执行比较久的代码怎么做,在适当的时候我们主动关闭session连接放回连接池中保证连接的可用
    公众号 种树飞编程 欢迎关注
  • 相关阅读:
    1014 Waiting in Line (30)(30 point(s))
    1013 Battle Over Cities (25)(25 point(s))
    1012 The Best Rank (25)(25 point(s))
    1011 World Cup Betting (20)(20 point(s))
    1010 Radix (25)(25 point(s))
    1009 Product of Polynomials (25)(25 point(s))
    1008 Elevator (20)(20 point(s))
    1007 Maximum Subsequence Sum (25)(25 point(s))
    1006 Sign In and Sign Out (25)(25 point(s))
    1005 Spell It Right (20)(20 point(s))
  • 原文地址:https://www.cnblogs.com/lifei01/p/14598930.html
Copyright © 2011-2022 走看看