Celery
1、安装
pip3 install Django==2.0.4
pip3 install celery==4.3.0
pip3 install redis==3.2.1
pip3 install ipython==7.6.1
# 注意版本之间的兼容问题
2、测试
2.1 tasks.py文件进行验证
from celery import Celery
import time
app = Celery('TASK',
broker='redis://localhost',
backend='redis://localhost')
@app.task
def add(x, y):
print("running..add.", x, y)
return x + y
@app.task
def minus(x, y):
time.sleep(60)
print("running..minus.", x, y)
return x - y
2.2 启动celery worker 来开始监听并执行任务
celery -A tasks worker --loglevel=info # tasks是tasks.py文件:必须在tasks.py所在目录下执行
2.3 调用任务:再打开两个终端,进行命令行模式,调用任务
>>> import tasks
>>> import tasks
>>> t2 = tasks.minus.delay(9,11)
#然后在另一个终端重复上面步骤执行
>>> t1 = tasks.add.delay(3,4)
>>> t1.get() #由于t2执行sleep了3s所以t1.get()需要等待
2.4 celery其他命令
>>> t.ready() #返回true证明可以执行,不必等待
>>> t.get(timeout=1) #如果1秒不返回结果就超时,避免一直等待
>>> t.get(propagate=False) #如果执行的代码错误只会打印错误信息
>>> t.traceback #打印异常详细结果
3、在Django项目中如何使用Celery(celery无法在windowns下运行)
3.1 创建目录celery_task,并在celery_task下创建以下文件
# celery启动命令
celery -A celery_task worker -l INFO
# 启动celery定时任务运行
celery -A celery_task beat -l debug
# 文件名:celery
# -*- coding:utf8 -*-
from celery import Celery
from __future__ import absolute_import, unicode_literals
#1. absolute_import 可以使导入的celery是python绝对路基的celery模块,不是当前我们创建的celery.py
#2. unicode_literals 模块可能是python2和3兼容的,不知道
# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery('mycelery',
broker='redis://127.0.0.1:6379/2', # 任务存放的地方,6379redis默认端口
backend='redis://127.0.0.1:6379/3' # 结果存放的地方,6379redis默认端口
include=['celery_pro.tasks', #celery-pro是存放celery文件的文件夹名字
'celery_pro.tasks2',
])
#实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 1.添加django项目跟路径
import os, sys, django
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0,os.path.join(CELERY_BASE_DIR,'../opwf')) # opwf是项目名称
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置
# 如果报错不让超级管理员来启动,加入以下配置
from celery import platforms
platforms.C_FORCE_ROOT = True
# 定时任务配置
app.conf.update(
CELERYBEAT_SCHEDULE = {
# 每隔三分钟执行一次add函数
'every-3-min-add': {
'task': 'app01.tasks.add', # 执行的函数
'schedule': timedelta(seconds=180), # 每隔3分钟执行一次
'args':(1,2) # 测试时的一个固定参数
}
}
)
app.conf.timezone = 'Asia.Shanghai' # 定义时区,亚洲上海
if __name__ == '__main__':
app.start()
# 文件名:tasks
# -*- coding:utf8 -*-
from .celery import app
from .celery import CELERY_BASE_DIR
# 1.用add来做一个简单的测试
@app.task
def add(x, y):
return x + y