一、Celery异步任务框架
Celery是一个异步任务框架,并且是一个简单、灵活可靠的,处理大量消息的分布式系统
Celery服务为其他项目服务提供异步解决任务的需求,内置socket
Celery可执行的任务:执行异步任务,执行延迟任务,执行定时任务
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
二、Celery架构
Celery是由三部分组成的,消息中间件(message broker)、任务执行单元(worker)、任务结果存储(task result store)组成的。
Broker(任务中间件)————>Worker(任务执行者)————>Backend(任务结果仓库)
消息中间件:Celery是不提供消息服务的,但是可以使用第三方来提供消息服务(提供任务),列如,Redis。
任务执行单元:Worker会自动(后台异步)执行消息中间件(broker)中的任务任务。
任务结果存储:将Worker执行的结果存储在backend中,可以使用Redis来存储
三、Celery任务结构
Celery有两种任务结构,基本结构、包架构封装,但是我们提倡使用包架构封装,因为结构更加清晰,例如:
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
包架构封装
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # broker 任务队列,任务放到这里面
backend = 'redis://127.0.0.1:6379/2' # backend 结果存储,执行结果放在这里面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def multip(x, y):
return x * y
异步任务执行:
add_task.py
把 tasks.py 中的任务函数添加到 broker 中
windows 首先需要安装:pip install celery 和 pip install eventlet
需要启动 celery, 在包项目下输入以下命令:
C:project> celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
from celery_task.tasks import add
# 提交异步任务
ret = add.delay(5, 3) # 把add函数任务添加到 broker中,worker在异步实时取出执行
print(ret) # 0cc72e56-4604-4c00-bb3d-5b456f4869a7 获取执行结果需要此ID
延迟任务执行:
add_task.py
还是需要先启动celery
from celery_task.tasks import multip
# 提交延迟任务
from datetime import datetime, timedelta
# 需要UTC时间
eta = datetime.utcnow() + timedelta(seconds=10) # 当前UTC时间往后加10秒
ret = multip.apply_async(args=(9, 9), eta=eta) # 10 秒之后执行
print(ret) # 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2 获取执行结果需要此ID
定时任务执行:
执行定时任务需要从新配置celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://127.0.0.1:6379/1' # broker 任务队列,任务放到这里面
backend = 'redis://127.0.0.1:6379/2' # backend 任务队列,执行结果放在这里面
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
app.conf.beat_schedule = {
# add 任务
'add-task': {
'task': 'celery_task.tasks.add',
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点执行一次
'args': (300, 150),
},
# multip 任务
'multip-task': {
'task': 'celery_task.tasks.multip',
'schedule': timedelta(seconds=3), # 每三秒执行一次
'args': (300, 150),
}
}
启动 worker 等待执行任务
celery -A celery_task beat -l info
celery -A 包名 beat -l info
启动 beat 将任务添加 broker 中,让worker执行
celery -A celery_task worker -P eventlet -l info
celery -A 包名 worker -P eventlet -l info
查看任务执行结果:
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'
if __name__ == '__main__':
asyncs = AsyncResult(id=id, app=app)
if asyncs.successful():
result = asyncs.get()
print(result) # 成功则取出backend中id对应的值
elif asyncs.failed():
print('任务失败')
elif asyncs.status == 'PENDING':
print('任务等待中被执行')
elif asyncs.status == 'RETRY':
print('任务异常后正在重试,或id不存在')
elif asyncs.status == 'STARTED':
print('任务已经开始被执行')
基本结构
创建py文件:celery_app_task.py
from celery import Celery
import time
# backend='redis://:123456@127.0.0.1:6379/1' # 有密码123456
broker = 'redis://127.0.0.1:6379/1' # broker 任务队列,任务放到这里面
backend = 'redis://127.0.0.1:6379/2' # backend 任务队列,执行结果放在这里面
app = Celery(__name__, broker=broker, backend=backend)
@app.task
def add(x, y):
return x + y
启动 worker
celery -A celery_app_task worker -P eventlet -l info
添加任务:add_task.py
from celery_app_task import add
# 提交任务
ret = add.delay(5, 3) # 往 broker 中添加一个任务
print(ret)