celery
功能描述
它是一个简单、灵活、可靠的用于处理大量消息的分布式系统。
功能主要有三个:执行异步任务,执行延迟任务,执行定时任务。
举个例子,你现在有两个项目、一个项目用于爬取数据,一个项目用于分析数据,如何在数据爬取后将任务交给另一个项目进行分析呢?这种场景下就可以使用celery
进行处理。
一个噩耗消息:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
Celery是一个资金较少的项目,因此我们不支持Microsoft Windows。请不要提出与该平台有关的任何问题。
尽管官方提示不支持windows
,但是你仍然可以进行使用,这可能需要一些其他模块的辅助。
celery
是单独的服务,并不依赖于其他框架,就像Django
一样你只要安装了它就可以通过自身命令启动服务。
架构介绍
celery
架构由三部分组成,分别是消息中间件message broker
,任务执行单元worker
与任务执行结果存储task result store
,如下图所示:
celery
是一个独立运行的服务,内置socket
,如果想使用它你需要做这几件事情:
- 安装celery环境框架,配置broker与backend,启动celery服务
- 添加任务到borker,worker就会自动的在后台执行任务
- 任务执行完成后,通过backend获取结果
基本使用
安装使用
安装模块,我装的旧版,新版5.x
的有些摸不着头脑:
pip3 install celery==4.4.7
新建一个python
包,任意名字。
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
在celery.py
中配置borker
与backend
:
from celery import Celery
broker = "redis://127.0.0.1:6379/1" # broker任务队列
backend = "redis://127.0.0.1:6379/2" # 结构存储,执行完的结果存在这
# 如果有密码:"redis//:password@127.0.0.1:6379/2"
app = Celery(
__name__, # 取名,随便取
broker=broker,
backend=backend,
include=[
"celery_tasks.task", # 第一个任务,必须是包名.文件名
]
)
任务书写
在tasks.py
中开始书写任务:
from .celery import app
@app.task # 必须添加该装饰器
def add(x,y):
return x+y
@app.task
def sub(x,y):
return x-y
@app.task
def multi(x,y):
return x*y
任务执行
在add_task.py
中开始执行任务,三个任务分别指定三种不同的执行状态:
# 导入定义好的任务
from celery_task import tasks
# 添加异步任务,返回结果。任务号
t1_id = tasks.add.delay(10,20)
# 配置延迟、定时任务的时区为本地,如果延迟任务不生效,则取消本地时区的设置(windows下失效)
from celery_task.celery import app
# 时区
# app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
# app.conf.enable_utc = False
# 添加延迟任务,返回结果。任务号
from datetime import datetime,timedelta
time = datetime.utcnow() + timedelta(seconds=10) # 十秒后执行
t2_id = tasks.sub.apply_async(args=(100,50),eta=time)
# 添加定时任务,需要启动定时任务beat服务
from celery.schedules import crontab # 如果要定义其他的周期日期,导入这个
app.conf.beat_schedule = {
'multi-task': {
'task': 'celery_task.tasks.multi',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (20, 10),
}
}
获取结果
在get_result.py
中书写获取结果的代码:
from celery_task.celery import app
from celery.result import AsyncResult
id = 'a9ffd16c-dbe0-44d2-9317-b198b432273c' # 任务号
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
启动服务
接下来启动服务,首先切换到该包的上级目录中:
# cd project
# Linux
celery worker -A 模块名 -l info
# Windows
需要先安装eventlet模块
pip install eventlet
celery worker -A 包名 -l info -P eventlet
# 如果是定时任务,还需要启动beat服务
celery beat -A 包名 -l info
Django使用
基本使用
如果在Django
中要使用celery
,则需要将celery
项目建立在Django
项目的根目录下:
- DjangoProject01
- celery_project
- __init__.py
- celery.py
- django_app_name_task.py
- app01
- djangoproject01
同时,在任务中还需要导入Django
环境,一般书写在celery.py
文件中即可:
import os
import django
from celery import Celery
# 由于celery是独立的项目,所以必须导入django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "DjangoProject.settings")
django.setup()
broker = 'redis://127.0.0.1:6379/1' # broker任务队列
backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这
app=Celery(__name__,broker=broker,backend=backend,include=['celery_project.app01_task',])
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_project.app01_task.task01',
'schedule': timedelta(hours=4),
}
}