一.简介
Celery是一个异步任务的调度工具。 Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。 在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。 这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。
我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。 Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。
它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,
使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
二.Celery 的架构
Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。我们需要一个消息队列来下发我们的任务。
首先要有一个消息中间件,此处选择rabbitmq (也可选择 redis 或 Amazon Simple Queue Service(SQS)消息队列服务)。推荐 选择 rabbitmq 。使用RabbitMQ是官方特别推荐的方式。它的架构组成如下图:
可以看到,Celery 主要包含以下几个模块:
-
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
-
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
-
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
-
任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。
各个中间件的比较:
三.安装及初步使用
pip install celery #安装celery
小试牛刀:1. 定义任务函数。2. 运行celery服务。3. 客户应用程序的调用。
编写一个tasks.py的文件
# encoding: utf-8 from celery import Celery #任务调度名称,指定中间件和结果存储的位置 app = Celery('tasks',broker="redis://127.0.0.1:6379/0",backend='redis://127.0.0.1:6379/1') @app.task def add(x,y): return x + y
上述代码导入了celery,然后创建了celery 实例 app,实例化的过程中指定了任务名tasks
(和文件名一致),传入了broker和backend。然后创建了一个任务函数add
。
运行:
celery -A task worker -l info -P eventlet
这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别,-P这个参数是celery4.x之后要使用的,否则会报错。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,
worker界面展示信息如下:
如果你不想使用 celery 命令来启动 worker,可直接使用文件来驱动,修改task.py (增加入口函数main)
if __name__ == '__main__': app.start()
再执行
python task.py worker
终端调用(需要在tasks.py同级目录下运行):
D: omcelery λ python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> r=add.delay(3,4) >>> print(r.get()) 7 >>> print(r.result) 7 >>> print(r.ready())
到redis看一下结果:
另外一种方法:
启用任务和的调度都使用脚本
功能:模拟一个耗时操作,并打印 worker 所在机器的 IP 地址,中间人和结果存储都使用 redis 数据库
#encoding=utf-8 #filename my_first_celery.py from celery import Celery import time import socket app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' ) def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 s = x + y print("主机IP {}: x + y = {}".format(get_host_ip(),s)) return s
启动这个 worker:
celery -A my_first_celery worker -l info -P eventlet
这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,worker界面展示信息如下:
调用任务:
在 my_first_celery.py 的同级目录下编写如下脚本 start_task.py如下。
#encoding=utf-8 from my_first_celery import add #导入我们的任务函数add import time result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行 while not result.ready():# 循环检查任务是否执行完毕 print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get()) #获取任务的返回结果 print(result.successful()) #判断任务是否成功执行
执行
python start_task.py
结果如下所示:
发现等待了大约3秒钟后,任务返回了结果24,并且是成功完成,此时worker界面增加的信息如下:
这里的信息非常详细,其中2004447e-1183-4d65-ae5d-bd8ead70e216是taskid,只要指定了 backend,根据这个 taskid 可以随时去 backend 去查找运行结果,使用方法如下:
>>> from my_first_celery import add >>> taskid='2004447e-1183-4d65-ae5d-bd8ead70e216' >>> add.AsyncResult(taskid).get() 24 >>> #或者 >>> from celery.result import AsyncResult >>> AsyncResult(taskid).get() 24
重要说明:如果想远程执行 worker 机器上的作业,请将 my_first_celery.py 和 start_tasks.py 复制到远程主机上(需要安装
celery),修改 my_first_celery.py 指向同一个中间人和结果存储,再执行 start_tasks.py 即可远程执行 worker 机器上的作业。my_first_celery.add函数的代码不是必须的,你也要以这样调用任务:
from my_first_celery import app app.send_task("my_first_celery.add",args=(1,3))
四.第一个 celery 项目
在生产环境中往往有大量的任务需要调度,单独一个文件是不方便的,celery 当然支持模块化的结构,我这里写了一个用于学习的 Celery 小型工程项目,含有队列操作,任务调度等实用操作,目录树如下所示:
![](https://upload-images.jianshu.io/upload_images/12989993-047943a185b34898.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/189/format/webp)
app.py
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom from celery import Celery app = Celery("myCeleryProj", include=["myCeleryProj.tasks"]) app.config_from_object("myCeleryProj.settings") if __name__ == "__main__": app.start()
settings.py
#!/usr/bin/env python # -*- codin # g: utf-8 -*- #author tom from kombu import Queue import re from datetime import timedelta from celery.schedules import crontab CELERY_QUEUES = ( # 定义任务队列 Queue("default", routing_key="task.#"), # 路由键以“task.”开头的消息都进default队列 Queue("tasks_A", routing_key="A.#"), # 路由键以“A.”开头的消息都进tasks_A队列 Queue("tasks_B", routing_key="B.#"), # 路由键以“B.”开头的消息都进tasks_B队列 ) CELERY_TASK_DEFAULT_QUEUE = "default" # 设置默认队列名为 default CELERY_TASK_DEFAULT_EXCHANGE = "tasks" CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic" CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default" CELERY_ROUTES = ( [ ( re.compile(r"myCeleryProj.tasks.(taskA|taskB)"), {"queue": "tasks_A", "routing_key": "A.import"}, ), # 将tasks模块中的taskA,taskB分配至队列 tasks_A ,支持正则表达式 ( "myCeleryProj.tasks.add", {"queue": "default", "routing_key": "task.default"}, ), # 将tasks模块中的add任务分配至队列 default ], ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.*", {"queue": "default"}), # 将tasks模块中的所有任务分配至队列 default # ], # ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default # ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A # ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B # ], # ) BROKER_URL = "redis://127.0.0.1:6379/0" # 使用redis 作为消息代理 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # 任务结果存在Redis CELERY_RESULT_SERIALIZER = "json" # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显 CELERYBEAT_SCHEDULE = { "add": { "task": "myCeleryProj.tasks.add", "schedule": timedelta(seconds=10), "args": (10, 16), }, "taskA": { "task": "myCeleryProj.tasks.taskA", "schedule": crontab(hour=21, minute=10), }, "taskB": { "task": "myCeleryProj.tasks.taskB", "schedule": crontab(hour=21, minute=12), }, }
readme.txt
#启动 worker #分别在三个终端窗口启动三个队列的worker,执行命令如下所示: celery -A myCeleryProj.app worker -Q default -l info celery -A myCeleryProj.app worker -Q tasks_A -l info celery -A myCeleryProj.app worker -Q tasks_B -l info #当然也可以一次启动多个队列,如下则表示一次启动两个队列tasks_A,tasks_B。 celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info #则表示一次启动两个队列tasks_A,tasks_B。 #最后我们再开启一个窗口来调用task: 注意观察worker界面的输出 >>> from myCeleryProj.tasks import * >>> add.delay(4,5);taskA.delay();taskB.delay() #同时发起三个任务 <AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474> <AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6> <AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab> #也可以使用下面的方法调用task >>> from myCeleryProj.app import app >>> app.send_task(myCeleryProj.tasks.add,args=(4,5) >>> app.send_task(myCeleryProj.tasks.taskA) >>> app.send_task(myCeleryProj.tasks.taskB)
转自:https://blog.csdn.net/somezz/article/details/82343346
五.管理与监控
Celery管理和监控功能是通过flower组件实现的,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理。
安装使用
pip3 install flower
启动
flower -A project --port=5555 # -A :项目目录 #--port 指定端口
访问http:ip:5555
api使用,例如获取woker信息
curl http://127.0.0.1:5555/api/workers
结果: