实验目标:
访问http://127.0.0.1:8000/task_id/ 返回任务ID:068cd3cc-3572-46c0-9a33-f028772030b1
访问http://127.0.0.1:8000/task_result/?id=068cd3cc-3572-46c0-9a33-f028772030b1 将上面的任务ID做查询参数,执行查询返回任务执行结果:252
实验过程:
要在django项目中使用celery,需要将celery.py加到项目目录下,将tasks.py加到app目录下,修改__init__.py配置和settings配置
目录结构如下:项目名proj, 应用名app
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 写上这个celery就能在单个.py文件中用Django的配置 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('projj') #这个可以随便取,一般为项目名 # celery的配置在django的settings.py文件中 # namespace='CELERY'表示所有与celery有关的配置都是`CELERY_`的格式 app.config_from_object('django.conf:settings', namespace='CELERY') ''' 自动发现每个app下的tasks.py文件,例如: - app1/ - tasks.py - models.py - app2/ - tasks.py - models.py ''' app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from __future__ import absolute_import, unicode_literals from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from __future__ import absolute_import, unicode_literals # 保证celery_app一直处于加载状态 # @shared_task会使用它 from .celery import app as celery_app __all__ = ['celery_app']
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
# 在settings.py最后加上: CELERY_BROKER_URL = 'redis://localhost' CELERY_RESULT_BACKEND = 'redis://localhost'
需要注意的是如何时间方面有涉及到中国地区的话,需在配置中加入时区信息CELERY_TIMEZONE = 'Asia/Shanghai',默认为以utc为标准。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from django.contrib import admin from django.urls import path from app import views urlpatterns = [ path('admin/', admin.site.urls), path('task_id/', views.get_result_id), path('task_result/', views.get_result), ]
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from django.shortcuts import HttpResponse from . import tasks from celery.result import AsyncResult def get_result_id(request): t = tasks.add.delay(228, 24) return HttpResponse(t.id) def get_result(request): task_id = request.GET.get('id', '') res = AsyncResult(id=task_id) if res.ready(): return HttpResponse(res.get()) else: return HttpResponse('正在执行,稍后才有结果...')
启动方式:
1.先启动redis服务
`redis-server`
2.启动celery的worker
[root@localhost ~]# cd proj/ #一定要去项目目录下,否则会提示找不到proj
[root@localhost proj]# celery -A proj worker -l debug
# 也可以启动多个worker, 下面是启动两个worker:w1和w2
[root@localhost proj]# celery multi start w1 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
> w1@localhost.localdomain: OK
[root@localhost proj]# celery multi start w2 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
> w2@localhost.localdomain: OK
3. 启动django项目
[root@localhost ~]# cd proj/
[root@localhost proj]# python3 manage.py runserver 0.0.0.0:8000
OK!访问就能成功了
django执行celery定时任务
1. 安装beat:
pip3 install django-celery-beat
2. 将beat加进app
INSTALLED_APPS = [
...
'django_celery_beat',
]
3. 在tasks.py中定义定时任务
@shared_task
def sayhi(name):
print("Hi," + name)
4.在表中定义定时任务
先生成表:
`python3 manage.py makemigrations`
`python3 manage.py migrate`
启动流程:
1. 开启redis服务:`redis-server`
2.开启celery的worker:`celery -A proj worker -l debug` //在项目目录下执行
3.启动django项目:`python3 manage.py runserver 0.0.0.0:8000`
4. 启动beat,作用是定时丢任务到redis中: `celery -A proj beat -l info -S django`
注意,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到