一、celery概述
1.1、celery定义
celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。
1.2、各模块版本
模块 | 版本 |
python | 3.6.12 |
django | 2.2.17 |
celery | 5.0.5 |
django-celery-beat | 2.1.0 |
django-celery-results | 2.0.0 |
1.3、参考
https://www.cnblogs.com/wdliu/p/9530219.html
https://www.cnblogs.com/f-g-f/p/11300656.html
二、纯代码celery
2.1、目录结构
celery01
├── dy.py
└── proj01
├── celeryconfig.py
├── celery.py
├── __init__.py
└── tasks.py
2.2、代码
# celeryconfig.py # celery配置,队列和结果存放地址,序列化格式 BROKER_URL = 'redis://8.136.184.235:6479' CELERY_RESULT_BACKEND = 'redis://8.136.184.235:6479/0' CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# celery.py # 实例化celery,生成实例对象 from __future__ import absolute_import from celery import Celery app = Celery('proj01', include=["proj01.tasks"]) app.config_from_object("proj01.celeryconfig") if __name__ == "__main__": app.start()
# tasks.py # celery的具体功能代码,此处为add函数 from __future__ import absolute_import from time import sleep from proj01.celery import app @app.task def add(x, y): sleep(1) return x + y
# __init__.py # 空文件,无内容
# dy.py from proj01.tasks import add import time t1 = time.time() r1 = add.delay(1, 2) r2 = add.delay(2, 4) r3 = add.delay(3, 6) r4 = add.delay(4, 8) r5 = add.delay(5, 10) r_list = [r1, r2, r3, r4, r5] for r in r_list: while not r.ready(): pass print('task:', r) print(r.result) t2 = time.time() print('共耗时:%s' % str(t2-t1))
2.3、运行代码
启动celery服务:
在项目目录下,此项目目录是/root/celery01,执行如下命令:
celery -A proj01.celery worker -l info
调用服务:
在项目目录下,此项目目录是/root/celery01,执行如下命令:
python dy.py
2.4、结果
dy.py运行结果,如下图所示:
celery服务运行结果,如下图所示:
三、纯代码celery定时任务
3.1、概述
实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。
3.2、代码变动
tip:在上一步(第二章 纯代码 Celery)的代码基础上改动,红色为变动内容
# celery.py # 添加定时任务的功能,硬编码添加定时任务 from __future__ import absolute_import from celery import Celery from datetime import timedelta from celery.schedules import crontab app = Celery('proj01', include=["proj01.tasks"]) app.config_from_object("proj01.celeryconfig") ## 定时任务方式一 app.conf.CELERYBEAT_SCHEDULE = { 'add': { 'task': 'proj01.tasks.add', 'schedule': timedelta(seconds=5), 'args': (16, 16) } } # crontab配置方法:https://www.cnblogs.com/alex3714/p/6351797.html ## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来 #app.conf.beat_schedule = { # # Executes every Monday morning at 7:30 a.m. # 'add': { # 'task': 'proj01.tasks.add', # 'schedule': crontab(hour=7, minute=30, day_of_week=1), # 'args': (16, 16), # }, #} if __name__ == "__main__": app.start()
3.3、运行代码:
启动celery服务:
在项目目录下,此项目目录是/root/celery01,执行如下命令:
celery -A proj01.celery worker -l info --beat
3.4、结果
会按照定时设置,自动执行任务。
四、django Celery
4.1、目录结构
红色为新增或改动的文件
proj02
├── app02
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── models.py
│ ├── tasks.py # 应用下,会被异步调用的函数
│ ├── tests.py
│ └── views.py
├── manage.py
└── proj02
├── celery.py # 实例化celery,生成celery实例对象
├── __init__.py
├── settings.py # celery配置
├── urls.py
└── wsgi.py
4.2、代码
# setting.py # 基础配置(apps、db)这里就不说了 ... # 省略其他配置内容 CELERY_TIMEZONE = "Asia/Shanghai" CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 BROKER_URL = 'redis://127.0.0.1:6479' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6479/0' CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# celery.py # 实例化celery对象,生成celery实例对象 from __future__ import absolute_import, unicode_literals import os from django.conf import settings from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj02.settings') app = Celery('proj02') # 此处不能用namespace,否则会出错 # app.config_from_object('django.conf:settings', namespace='CELERY') app.config_from_object('django.conf:settings') app.autodiscover_tasks(settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
# tasks.py # 定义异步任务函数 from celery import shared_task @shared_task def add(x, y): print(x+y) return x + y
# app02/views.py # 定义视图函数,调用tasks.py里的异步函数 from django.http import JsonResponse from app02 import tasks def index(request, *args, **kwargs): res=tasks.add.delay(1,3) return JsonResponse({'status':'successful','task_id':res.task_id}) # proj02/urls.py from django.contrib import admin from django.urls import path from app02.views import index urlpatterns = [ path('admin/', admin.site.urls), path('', index), ]
4.3、运行代码
启动django服务:
在项目目录下,此项目目录是/root/proj02,执行如下命令:
python manage.py runserver 0.0.0.0:8888
启动celery服务:
在项目目录下,此项目目录是/root/proj02,执行如下命令:
celery -A proj02.celery worker -l info
调用接口:
http://<django服务器地址ip>:8888
4.4、结果
{"status": "successful", "task_id": "fa1dc915-b1c2-4250-8557-2172d00febf1"}
扩展:task结果存储
CELERY_RESULT_BACKEND
1、概述
除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作。
2、配置
安装依赖:
pip install django-celery-results
配置settings.py,注册app:
INSTALLED_APPS = (
...,
'django_celery_results',
)
修改backend配置,将redis改为django-db:
#CELERY_RESULT_BACKEND = 'redis://127.0.0.0:6479/0'
CELERY_RESULT_BACKEND = 'django-db'
同步数据库:
python manage.py migrate django_celery_results
3、使用
查看数据库表(django_celery_results),每个任务执行后的结果,存储在这里。
五、django Celery定时任务
5.1、 概述
实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。
5.2、代码变动
tip:在上一步(第四章 django Celery)的代码基础上改动,红色为变动内容
# proj02/celery_schedule.py # 定时任务执行的功能函数 from datetime import timedelta from celery.schedules import crontab ## 定时任务方式一 CELERYBEAT_SCHEDULE = { 'add': { 'task': 'app02.tasks.add', 'schedule': timedelta(seconds=10), 'args': (16, 16) } } ## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来 #CELERYBEAT_SCHEDULE = { # 'add': { # 'task': 'app02.tasks.add', # 'schedule': crontab(hour=7, minute=30, day_of_week=1), # 'args': (16, 16) # } #}
# proj02/settings.py # 新增对celery_schedule.py文件的引用 from proj02.celery_schedule import CELERYBEAT_SCHEDULE INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app02', 'django_celery_beat', 'django_celery_results', # 这个是本页`扩展`章节的内容 ]
5.3、运行代码
启动django服务:
在项目目录下,此项目目录是/root/proj02,执行如下命令:
python manage.py runserver 0.0.0.0:8888
启动celery服务:
在项目目录下,此项目目录是/root/proj02,执行如下命令:
celery -A proj02.celery worker -l info --beat
5.4、结果
会按照定时设置,自动执行任务。
六、django管理后端自定义定时任务
6.1、启动服务
启动django服务:
在项目目录下,此项目目录是/root/proj02,执行如下命令:
python manage.py runserver 0.0.0.0:8888
启动celery服务(特别注意启动时加上参数,红色标记):
在项目目录下,此项目目录是/root/proj02,执行如下命令:
celery -A proj02.celery worker -l info --beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
6.2、界面配置使用
登录管理后台,看到此界面,如下:
详细配置,不具体介绍了,自己点点试试,很快就能知道。
七、自定义动态定时任务
7.1、概述
django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。
7.2、实例代码
后续补充
八、celery监控
https://www.cnblogs.com/zivli/p/11517797.html
后续补充