zoukankan      html  css  js  c++  java
  • Flask+Celery 异步任务

    Flask+Celery 异步任务

    一、 安装

    1. pip install redis
    2. pip install celery
      之所以要安装redis,是因为需要redis作为celery的消息中间件

    二、配置

    1. 新建一个celery_fun.py作为celery的配置文件,并把所有相关方法放进此py。
    # ============== create celery ==============
    from celery import Celery
    
    # 配置
    redis_url = f'redis://:{REDIS_INFO.get("password")}@{REDIS_INFO.get("host")}:{REDIS_INFO.get("port")}/{REDIS_INFO.get("celery_db")}'
    
    # 初始化celery
    celery_app = Celery("auto_app",
                        broker=redis_url,
                        backend=redis_url
                        )
    

    三、调用

    1. 先在celery.py定义需要使用celery的函数。
    @celery_app.task()
    def add_active(active_info):
        """
        异步添加信息
        :param active_info:
        :return:
        """
        # PyMongo 不是进程安全,所以子进程需要创建自己的连接
        mongo = MongoClient()
        mongo.add_data(MONGO_ACTIVE_TASK, active_info)
        mongo.close()
    
    1. 在Flask里接口进行调用
    @assist.route('add_active_info', methods=['POST'])
    def add_active_info():
        """添加信息"""
    	return_result={'ret_data':{}}
    	
    	# 调用celery
        celery_result = add_active.delay(active_info)
        
        return_result['ret_data']['celery_id'] = celery_result.id
        return jsonify({'messsage':return_result}),200
    
    
    1. 可以提供一个接口查询任务的状态
    @auto_app.route('/get_celery_state', methods=['POST'])
    def get_celery_state():
        info = request.get_json()
        celery_id = info.get('celery_id')
        result = AsyncResult(celery_id, app=celery_app)
        summary = {
            "state": result.state,
            "id": result.id,
        }
        return jsonify(summary), 200
    

    四、扩展

    1. delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 task在一分钟后再执行
    add_active.apply_async(active_info, countdown=60)
    
    1. delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
    2. bind为True,会传入self给被装饰的方法,这个参数会让 Celery 将 Celery 本身传入
    @celery.task(bind=True)
    

    五、启动

    1. redis需要开启服务
    2. celery启动命令celery -A celery_fun.celery_app worker --loglevel=info
    3. 启动flask
  • 相关阅读:
    认识ZooKeeper
    html5实现本页面元素拖放和本地文件拖放
    查询算法(一) 顺序查询与折半查询
    Shell排序算法和合并排序算法
    堆排序及修改选择排序
    Java实现冒泡排序,选择排序,插入排序
    穷举算法和递推算法(Java)
    由Spring框架中的单例模式想到的
    Java 线程面试题 Top 50(转 ImportNew)
    启动idea项目Debug模式时,报错Command line is too long
  • 原文地址:https://www.cnblogs.com/zy-mousai/p/14830412.html
Copyright © 2011-2022 走看看