zoukankan      html  css  js  c++  java
  • celery 与 flask 实现异步任务调度

        Flask 定了2中上下文,来实现机遇线程协程的,wsgi服务的请求(request、session)和存储(g,current_app )过程,通过栈来完成不同线程和协程的上下文切换,在与celery相结合处理异步任务时,需要保证异步任务在同一个上下文中执行,需要对celery进行重构, 避免出现:

    ile "/opt/xAssets/venv/lib/python3.6/site-packages/celery/app/trace.py", line 412, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/opt/xAssets/venv/lib/python3.6/site-packages/celery_context/model.py", line 42, in __call__
        with current_app.app_context():
      File "/opt/xAssets/venv/lib/python3.6/site-packages/werkzeug/local.py", line 347, in __getattr__
        return getattr(self._get_current_object(), name)
      File "/opt/xAssets/venv/lib/python3.6/site-packages/werkzeug/local.py", line 306, in _get_current_object
        return self.__local()
      File "/opt/xAssets/venv/lib/python3.6/site-packages/flask/globals.py", line 51, in _find_app
        raise RuntimeError(_app_ctx_err_msg)
    RuntimeError: Working outside of application context.
    
    This typically means that you attempted to use functionality that needed
    to interface with the current application object in some way. To solve
    this, set up an application context with app.app_context().  See the
    documentation for more information.

    celery 的使用

    •  应用:
    from celery import Celery
    
    celery = Celery('xassets',
                    broker=settings.CELERY_BROKER_URL,
                    include="xassets.tasks",
                    backend=settings.CELERY_RESULT_BACKEND,
                    )
    •  celery 以 默认方式(perfork)
    # 紧接上文

      from celery.signals import worker_process_init

    TaskBase = celery.Task
    
    
    class ContextTask(TaskBase):
        abstract = True
    
        def __call__(self, *args, **kwargs):
            with current_app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    
    
    celery.Task = ContextTask
    
    
    @worker_process_init.connect
    def init_celery_flask_app(**kwargs):
        """Create the Flask app after forking a new worker.
    
        This is to make sure no resources are shared between processes.
        """
        app = create_app()
        app.app_context().push()

    # 注意: worker_process_init 信号,仅用于perfork方式启动,如果以gevent,greelet 或者thread方式启动,这此处不执行,也就是会缺少缺少app实例化

    •  以gevent方式启动(大多数人的选择)
    TaskBase = celery.Task
    
    app = create_app()
    
    
    class ContextTask(TaskBase):
        abstract = True
    
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    
    
    celery.Task = ContextTask

    # 注意:1. 需要提前创建 flask 上下文应用实例

                2. 重定义task时,建议使用app.app_context

    celery_context 封装一键化应用

    from celery_context import Celery
    
    celery = Celery('xassets',
                    broker=settings.CELERY_BROKER_URL,
                    include="xassets.tasks",
                    backend=settings.CELERY_RESULT_BACKEND,
                    )
    
    app = create_app()
    celery.reload_task(app)
    # 或者 
    # celery.init_app(app)
  • 相关阅读:
    [jQuery]jQuery DataTables插件自定义Ajax分页实现
    [.NET Core].NET Core R2安装教程及Hello示例
    PHP openssl加密扩展使用总结
    PHP 运行方式(PHP SAPI介绍)
    SQL用法操作合集
    PHP mcrypt加密扩展使用总结
    PHP header函数的几大作用
    JS中的Navigator 对象
    数据在内存中存储的方式:大端模式与小端模式
    C++中各种数据类型占据字节长度
  • 原文地址:https://www.cnblogs.com/spaceapp/p/14501918.html
Copyright © 2011-2022 走看看