1.1 安装celery
pip install Django==2.2
pip install celery==4.4.7
pip install redis==3.5.3
2 新建 配置celery
3 配置celery六步 (重点)
# -*- coding: utf-8 -*-
# celery.py
# -*- coding: utf-8 -*-
from datetime import timedelta
from celery import Celery
import os,sys
import django
# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置
# 3.celery基本配置
app = Celery('proj',
broker='redis://localhost:6379/14',
backend='redis://localhost:6379/15',
include=['celery_task.tasks',
'celery_task.tasks2',
])
# 4.实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'celery_task.tasks.send_overtime',
'schedule': 10000.0,
'args': (16, 16)
},
}
# 6.添加时区配置
app.conf.timezone = 'UTC'
if __name__ == '__main__':
app.start()
4.task基础配置
from .celery import app #从当前目录导入app
import os,sys
from .celery import CELERY_BASE_DIR
from django.core.mail import send_mail
from django.conf import settings
#1.test_task_crontab 测试定时任务
from celery_task.celery import app
import time, random
from workflow.models import *
from django.db.models import Q
@app.task
def test_task_crontab(start,end):
time.sleep(5)
# Create your tests here.
print("定时任务即将耗时--------------------111111111111111111111")
return random.randint(start,end)
@app.task(bind=True)
def send_sms_code(self, username,email,text):
# time.sleep(60)
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
# 在方法中导包
from utils.task import send_mail_task
time.sleep(5)
try:
# 用 res 接收发送结果, 成功是:0, 失败是:-1
res = send_mail_task(username, email,text)
except Exception as e:
res = '-1'
if res == '-1':
# 如果发送结果是 -1 就重试.
self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))
@app.task
def send_overtime(start,end):
from django.utils import timezone
now = timezone.now()
# start_time = now - timezone.timedelta(days=7)
start_time = now - timezone.timedelta(hours=12) # 查询10天前的数据
end_time = now
qs = SubOrder.objects.filter(Q(approve_ts__range=(start_time, end_time)) & Q(suborder_status='1'))
send_mail(subject='工单超时',
message='jinwu提交的工单即将超时,请尽快审批',
from_email=settings.EMAIL_FROM, # 发送者邮箱
recipient_list=['wangsai_python@163.com',], # 接收者邮箱可以写多个
fail_silently=False)
return random.randint(start,end)
5.测试celery
5.1 启动celery
'''1.启动celery'''
#1.1 单进程启动
celery celery -A main worker -l INFO
#1.2 celery管理
celery multi start celery_test -A celery_test -l debug --autoscale=50,5
#celery并发数:最多50个,最少5个
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 关闭所有celery进程