celery定时任务
参考博客:https://www.cnblogs.com/xiaonq/p/14097376.html
1.项目目录结构图:
(~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~)
2. opwf_project/celery_task文件夹
# -*- coding: utf-8 -*-
import os, sys, time, django
from celery import Celery
# 1.添加django项目根路径
# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../opwf'))
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")
django.setup() # 读取配置
# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
# 3.celery基本配置
app = Celery('mycelery',
broker='redis://127.0.0.1:6379/14', # 任务存放的地方
backend='redis://127.0.0.1:6379/15', # 结果存放的地方
include=[
'celery_task.tasks', #定时任务
'celery_task.task2',
]) # 自己写的定时任务
# 4.实例化时可以添加下面这个属性 这个在真实项目中最好不要写
app.conf.update(
result_expires=3600, # 执行结果放到redis里,一个小时没人取就丢弃
)
# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'celery_task.tasks.test_task_crontab',
'schedule': 5.0, # 5秒执行一次
'args': (16, 16) # 传参
},
}
# 6.添加时区配置 一定要写这个时区 TUC会慢8个小时
app.conf.timezone = 'Asia/Shanghai'
if __name__ == '__main__':
app.start()
(~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~)
task.py文件 定义定时任务函数的
# -*- coding:utf8 -*-
from .celery import app
import time,random
@app.task
def randnum(start,end):
time.sleep(3)
return random.randint(start,end)
tasks2.py
3、在django项目中调用
# 1.导入任务
from celery_task import tasks
# 2.执行异步任务
tasks.send_sms_code.delay(18538752511,())
(~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~)
命令管理
'''1、celery管理 '''
celery multi start celery_test -A celery_test -l debug --autoscale=50,5
# celery并发数:最多50个,最少5个
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9
# 关闭所有celery进程
'''2、django-celery-beat心跳服务管理 '''
celery -A celery_test beat -l info -S django #启动心跳任务
ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null # 杀死心跳所有进程