安装 django + celery + results
https://www.cnblogs.com/lanheader/p/13615772.html
安装 django-celery-beat
pip install django-celery-beat
配置 settings.py
INSTALLED_APPS = [ # ... 'django_celery_results', # 查看 celery 执行结果 'django_celery_beat', # pip install django-celery-beat ]
生成 Django-celery-beat 关联表
python manage.py migrate
app 目录 tasks.py 添加 task
import json import requests import datetime from celery import task from django.core.cache import cache from django.core.mail import send_mail from django.utils import timezone from .models import TodayViewPage @task def task_send_dd_text(url, msg, atMoblies, atAll="flase"): body = { "msgtype": "text", "text": { "content": msg }, "at": { "atMobiles": atMoblies, "isAtAll": atAll } } headers = {'content-type': 'application/json', 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'} r = requests.post(url, headers=headers, data=json.dumps(body)) return r.text @task def task_save_view_page(): today = datetime.date.today() today_page_view = cache.get("today_page_view_%s" % (today.day - 1)) if today_page_view: yesterday = datetime.date.today() + datetime.timedelta(days=-1) page_view_query = TodayViewPage.objects.filter(today=yesterday) if page_view_query.exists(): page_view_obj = page_view_query.first() page_view_obj.view_page = today_page_view page_view_obj.save() else: page_view_obj = TodayViewPage( today=yesterday, view_page=today_page_view ) page_view_obj.save() return "view page save success" return "view page save failed"
管理后台配置任务
启动 celery worker
# 项目根目录终端执行(joyoo 项目名称) celery -A joyoo worker -l info # 守护进程 /root/.virtualenvs/blog/bin/celery multi start w1 -A joyoo -l info --logfile=./celerylog.log