Celery是Python开发的分布式任务调度模块, 处理异步
任务队列:是一种在线程或机器间发配任务的机制。
消息列队:消息列队的输入是工作的一个单元,称为任务,独立的职程(worker)进程持续监听列队是否需要处理的新任务
Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
Celery 是用 Python 编写的,但协议可以用任何语言实现。迄今,已有 Ruby 实现的 RCelery 、node.js 实现的 node-celery 以及一个 PHP 客户端 ,语言互通也可以通过 using webhooks 实现。4
celery 两大功能
任务发布 任务调度
任务存入 redis消息代理broker 任务消费worker 储存数据库
这儿 -- 任务发布和[任务消费worker]通过celery, [消息代理broker]和储存结果通过 redis
启动命令
celery worker -A celery_test_module -l info -c 3 -B
定时任务:
# 任务调度 CELERYBEAT_SCHEDULE = { # 定时秒 # 'redis_manage':{ # 'task':'celery_test_module.celery_tasks.run', # 'schedule': timedelta(seconds=5) # }, # 定时任务minute及以上 redis_manage2取的名字 'redis_manage2': { 'task': 'celery_test_module.celery_tasks.run', 'schedule': crontab(minute=4) # 每小时的第多少分钟 } }
触发任务 delay
func.delay(*args) # 执行一次celert接受一次(消费任务)
可以开启多个 worker celery worker -A celery_test_module -l info -c 3 -B
多个消费任务 就 通过 celery的Broker分配给多个worker消费
上面的命令行实际上启动的是Worker,如果要放到后台运行,可以扔给supervisor。
具体配置celery
1.任务列队 celery_tasks.py
#coding=utf-8 from __future__ import absolute_import import time import redis from celery_test_module.celery import celery_test conn = redis.Redis() # celery_test Celery实例 @celery_test.task def add(a=1, b=3): time.sleep(2) return a+b @celery_test.task def run(): return 50
2.配置文件config.py
#coding=utf-8 from __future__ import absolute_import # 可以导入同名模块 ps:本地存在 from datetime import timedelta from celery.schedules import crontab BROKER_URL = 'redis://127.0.0.1/2' # 存任务地址 CELERY_RESULT_BACKEND = 'redis://127.0.0.1/3' # 存结果地址 CELERY_TIMEZONE='Asia/Shanghai' # 上海时间 # 任务调度 定时任务才需要配置这个 CELERYBEAT_SCHEDULE = { # 定时秒 # 'redis_manage':{ # 'task':'celery_test_module.celery_tasks.run', 所有的配置 实例 task py文件都在celery_test_module 下 run是func # 'schedule': timedelta(seconds=5) # }, # 定时任务minute及以上 redis_manage2取的名字 'redis_manage2': { 'task': 'celery_test_module.celery_tasks.add', # 这儿计时任务,对add计时. 只需要启动celery到时间自动执行 add方法 'schedule': crontab(minute=55) # 每小时的第多少分钟 } }
3.实例celery.py
#coding=utf-8 from __future__ import absolute_import from celery import Celery celery_test = Celery('celery_tornado', include=['celery_test_module.celery_tasks']) # 实例celery 参数 enyname+task主体 celery_test.config_from_object('celery_test_module.celery_config') # 加载配置 if __name__ == '__main__': celery_test.start()
4.触发任务
在celery_test_model外的 test_chufa.py
from celery_test_module.celery_tasks import add import time for i in xrange(50): add.delay(i + 1, 0) # celery的delay time.sleep(1)
启动celery worker -A celery_test_model -l info -c 3 -B -l日志级别 -c数量 -B定时任务
触发任务 通过 python test_chufa.py 启动