目标:让一些耗时操作实现异步执行
员工:celery, redis, celery_worker
场景:现在从北京到上海,有一批货物要运送,一共6卡车的货物。第一种方法,就是客户(任务发布方)自己开一辆车从北京到上海,需要6天。第二种方案,把货卸载到快递网点(redis),把货物贴上序号(消息队列),快递员(celery)叫来三辆货车(celery_worker)依次拉货,需要2天。
举例:一个耗时任务,需要耗时3个小时 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
思路:将任务编号,组成一个队列,放到redis数据库中,然后让celery的进程去队列里依次执行这些函数,执行完,将结果返回给数据库中
步骤:
- 谁干:实例化celery
- 在哪干:redis数据库地址告诉celery
- 授权:哪些函数对象可以异步,用装饰器@celery.task授权这些函数可以
- 执行:耗时函数执行异步,调用delay方法
代码:
1 import time 2 from flask import Flask 3 from celery import Celery 4 5 app = Flask(__name__) 6 app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' 7 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' 8 9 # 一,谁干?实例化celery对象,让celery来干 10 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 11 # 二,哪里干?redis数据库地址告诉celery 12 celery.conf.update(app.config) 13 14 # 三,授权,celery.task表明哪些函数可以在celery worker里运行 15 @celery.task 16 def my_background_task(arg1): 17 time.sleep(arg1) 18 return arg1 19 20 21 @app.route('/', methods=['GET', "POST"]) 22 def hello_world(): 23 # 四,函数用delay方法表名函数是在celery worker中运行 24 celery_id=my_background_task.delay(30) 25 return "<h1>{}!<h1>".format(celery_id) 26 27 28 if __name__ == '__main__': 29 app.run()
celery的配置,也可以通过定义配置类来实现:
1 import time 2 from flask import Flask 3 from celery import Celery 4 5 6 class CeleryConfig(): 7 timezone = 'UTC' 8 BROKER_URL = 'redis://localhost:6379/0' # 消息队列存放地址 9 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # celery worker 执行结果返回存放地址 10 CELERY_TIMEZONE = 'Asia/Shanghai' # 时区 11 CELERY_ACKS_LATE = True # 只有当worker执行完任务后,才会告诉MQ,消息被消费。 12 CELERYD_FORCE_EXECV = True # 非常重要,有些情况下可以防止死锁 13 CELERY_IGNORE_RESULT = True # 忽略结果,不关心运行结果时可以关闭 14 CELERY_TASK_SERIALIZER = 'json' # 任务序列化方式 15 CELERY_DISABLE_RATE_LIMITS = True # 对任务消费的速率进行限制开关 16 CELERYD_PREFETCH_MULTIPLIER = 1 # worker预先获取任务数量 17 CELERYD_MAX_TASKS_PER_CHILD = 30 # worker最大执行任务数,超过数量销毁,防止内存泄漏等问题 18 CELERY_CREATE_MISSING_QUEUES = True # 队列不存在即创建 19 BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 7 * 24 * 60 * 60, 'max_retries': 1} # celery worker超时自动重启时间 20 CELERYD_CONCURRENCY = 3 # celery worker 最大并行数 21 22 23 app = Flask(__name__) 24 # 一,谁干?实例化celery对象,让celery来干 25 celery = Celery(app.name) 26 # 二,哪里干?redis数据库地址告诉celery 27 celery.config_from_object(CeleryConfig) 28 29 30 # 三,授权,celery.task表明哪些函数可以在celery worker里运行,time_limit是celery worker最大存活时间,单位是s,超时进程自杀 31 @celery.task(time_limit=3 * 60 * 60) 32 def my_background_task(arg1): 33 time.sleep(arg1) 34 return arg1 35 36 37 @app.route('/', methods=['GET', "POST"]) 38 def hello_world(): 39 # 四,函数用delay方法表名函数是在celery worker中运行 40 celery_id = my_background_task.delay(30) 41 return "<h1>{}!<h1>".format(celery_id) 42 43 44 if __name__ == '__main__': 45 app.run()