zoukankan      html  css  js  c++  java
  • flask中celery的使用

    目标:让一些耗时操作实现异步执行

    员工:celery, redis, celery_worker 

    场景:现在从北京到上海,有一批货物要运送,一共6卡车的货物。第一种方法,就是客户(任务发布方)自己开一辆车从北京到上海,需要6天。第二种方案,把货卸载到快递网点(redis),把货物贴上序号(消息队列),快递员(celery)叫来三辆货车(celery_worker)依次拉货,需要2天。

    举例:一个耗时任务,需要耗时3个小时 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。

    思路:将任务编号,组成一个队列,放到redis数据库中,然后让celery的进程去队列里依次执行这些函数,执行完,将结果返回给数据库中

    步骤:

    1. 谁干:实例化celery
    2. 在哪干:redis数据库地址告诉celery
    3. 授权:哪些函数对象可以异步,用装饰器@celery.task授权这些函数可以
    4. 执行:耗时函数执行异步,调用delay方法

    代码:

     1 import time
     2 from flask import Flask
     3 from celery import Celery
     4 
     5 app = Flask(__name__)
     6 app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
     7 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
     8 
     9 # 一,谁干?实例化celery对象,让celery来干
    10 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    11 # 二,哪里干?redis数据库地址告诉celery
    12 celery.conf.update(app.config)
    13 
    14 # 三,授权,celery.task表明哪些函数可以在celery worker里运行
    15 @celery.task
    16 def my_background_task(arg1):
    17     time.sleep(arg1)
    18     return arg1
    19 
    20 
    21 @app.route('/', methods=['GET', "POST"])
    22 def hello_world():
    23     # 四,函数用delay方法表名函数是在celery worker中运行
    24     celery_id=my_background_task.delay(30)
    25     return "<h1>{}!<h1>".format(celery_id)
    26 
    27 
    28 if __name__ == '__main__':
    29     app.run()

     celery的配置,也可以通过定义配置类来实现:

     1 import time
     2 from flask import Flask
     3 from celery import Celery
     4 
     5 
     6 class CeleryConfig():
     7     timezone = 'UTC'
     8     BROKER_URL = 'redis://localhost:6379/0'  # 消息队列存放地址
     9     CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # celery worker 执行结果返回存放地址
    10     CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区
    11     CELERY_ACKS_LATE = True  # 只有当worker执行完任务后,才会告诉MQ,消息被消费。
    12     CELERYD_FORCE_EXECV = True  # 非常重要,有些情况下可以防止死锁
    13     CELERY_IGNORE_RESULT = True  # 忽略结果,不关心运行结果时可以关闭
    14     CELERY_TASK_SERIALIZER = 'json'  # 任务序列化方式
    15     CELERY_DISABLE_RATE_LIMITS = True  # 对任务消费的速率进行限制开关
    16     CELERYD_PREFETCH_MULTIPLIER = 1  # worker预先获取任务数量
    17     CELERYD_MAX_TASKS_PER_CHILD = 30  # worker最大执行任务数,超过数量销毁,防止内存泄漏等问题
    18     CELERY_CREATE_MISSING_QUEUES = True  # 队列不存在即创建
    19     BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 7 * 24 * 60 * 60, 'max_retries': 1}  # celery worker超时自动重启时间
    20     CELERYD_CONCURRENCY = 3  # celery worker 最大并行数
    21 
    22 
    23 app = Flask(__name__)
    24 # 一,谁干?实例化celery对象,让celery来干
    25 celery = Celery(app.name)
    26 # 二,哪里干?redis数据库地址告诉celery
    27 celery.config_from_object(CeleryConfig)
    28 
    29 
    30 # 三,授权,celery.task表明哪些函数可以在celery worker里运行,time_limit是celery worker最大存活时间,单位是s,超时进程自杀
    31 @celery.task(time_limit=3 * 60 * 60)
    32 def my_background_task(arg1):
    33     time.sleep(arg1)
    34     return arg1
    35 
    36 
    37 @app.route('/', methods=['GET', "POST"])
    38 def hello_world():
    39     # 四,函数用delay方法表名函数是在celery worker中运行
    40     celery_id = my_background_task.delay(30)
    41     return "<h1>{}!<h1>".format(celery_id)
    42 
    43 
    44 if __name__ == '__main__':
    45     app.run()
    全世界的程序员们联合起来吧!
  • 相关阅读:
    计算几何模板1 点部分
    TTimerThread和TThreadedTimer(都是通过WaitForSingleObject和CreateEvent来实现的)
    dddd
    Ubuntu中查看硬盘分区UUID的方法(所有Linux目录的解释)
    VS2010对C++11的支持列表(感觉大部分都不支持)
    VC版本的MakeObjectInstance把WNDPROC映射到类的成员函数
    FpGrowth算法
    Go语言Web框架gwk介绍2
    页面缓存OutputCache
    jquery mobile扁平化设计样式--Jquery mobile Flat UI介绍
  • 原文地址:https://www.cnblogs.com/chaojiyingxiong/p/15058833.html
Copyright © 2011-2022 走看看