一、Celery介绍
1、Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时,也支持任务调度
注意:1、Celery目前不支持widows,用在widows可能或报错
2、celery服务可以不依赖任何服务器,通过自身命令启动服务
3、celery服务为为其他项目服务提供异步解决任务需求的
4、会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
二、Celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker) 和任务执行结果存储(task result store) 组成
1、消息中间件:
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
2、任务执行单元:
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
3、任务结果存储:
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
4、应用场景:
异步执行:解决耗时任务,比如发送短信、邮件、音频处理等
延迟执行:解决延迟任务
定时执行:解决周期任务,比如每天数据统计
三、Celery的使用
1、pip install celery
2、消息中间件:RabbitMQ/Redis
broker='redis://127.0.0.1:6379/3' #任务中间件 #有密码 backend='redis://:12345@127.0.0.1:6379/4' #任务结果仓库 #无密码 # backend='redis://127.0.0.1:6379/1'
3、创建app: app=Celery('任务名',broker=broker,backend=backend)
4、启动celery服务:
# 非windows 命令:celery worker -A 执行文件名 -l info # windows: pip3 install eventlet celery worker -A 执行文件名 -l info -P eventlet
注意:celery5.0后,windows上该命令会报错,执行的时候,需要切换到selery项目的目录下才可执行
四、Celery执行异步、延迟、定时任务
基本结构
手动提交任务,手动获取结果
worker对应的文件 celery_task.py
from celery import Celery broker='redis://:12345@127.0.0.1:6379/3' backend='redis://:12345@127.0.0.1:6379/4' app=Celery(__name__,broker=broker,backend=backend) @app.task def add(x,y): return x+y
添加任务的py文件
from celery_task import add add.delay(3,4)
执行命令:celery worker -A celery_task -l info -P eventlet
多任务结构(包架构封装)
使用方法:先在project目录下,执行该命令:
celery worker -A celery_task(包名) -l info -P eventlet 开启celery服务
再右键执行add_task.py(添加任务的py文件)
project ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
clery.py文件:
# -*-coding:utf-8 -*- from celery import Celery broker='redis://:12345@127.0.0.1:6379/3' backend='redis://:12345@127.0.0.1:6379/4' app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.tasks',])
tasks.py文件:
from .celery import app @app.task def add(n,m): return n+m @app.task def mulp(n,m): return n*m
add_task.py文件:
# -*-coding:utf-8 -*- from celery_task.tasks import add,mulp from datetime import datetime,timedelta print('执行任务了') print('add',add,'mulp',mulp) #添加异步任务 ret1=add.delay(10,20) ret2=mulp.delay(2,5) ''' 添加延迟任务 #需要utc时间 utc时间:datetime.utcnow() 延迟10秒:seconds=10 时间类型:timedelta()
eta:是datetime类型 ''' eta=datetime.utcnow()+timedelta(seconds=10) # 延迟任务使用的是 apply_async ret3=add.apply_async(args=(29,11),eta=eta)
get_result.py文件:(换上想要获取结果的任务id号,右键运行该文件就可以)
# -*-coding:utf-8 -*- from celery_task.celery import app from celery.result import AsyncResult #任务id号 id = 'e3e462a3-08bb-4e1b-9b3d-51f4554de118' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
定时任务
跟异步任务不同之处:
1、celery.py文件中定时任务的配置,
2、不用手动提交任务,而是借助beat来定时提交任务。
# -*-coding:utf-8 -*- from datetime import timedelta from celery.schedules import crontab from celery import Celery broker='redis://:12345@127.0.0.1:6379/3' backend='redis://:12345@127.0.0.1:6379/4' app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.tasks',]) # 定时执行任务的配置 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用utc (跟时区配置时配套使用的) app.conf.enable_utf=False # 任务的定时配置 app.conf.beat_schedule={ #自定义方法名:add-task 'add-task':{ 'task':'celery_task.tasks.add', #定时时间 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (100,22), } }
执行过程:先开启celery服务,再开启beat
beat服务启动命令:celery beat -A celery_task -l info
五、Django中使用celery
''' celery框架django项目工作流程 1)加载django配置环境 2)创建Celery框架对象app,配置broker和backend,得到的app就是worker 3)给worker对应的app添加可处理的任务函数,用include配置给worker的app 4)完成提供的任务的定时配置app.conf.beat_schedule 5)启动celery服务,运行worker,执行任务 6)启动beat服务,运行beat,添加任务 重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下 '''