1.安装
pip install django-celery pip install flower
2.部署RabbitMQ:
1.下载下载并安装erlang原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。 1.下载地址:http://www.erlang.org/downloads 2.安装路径:D:Worktoolserl 3.环境变量: set ERLANG_HOME = D:Worktoolserl; set PATH=%Path%;%ERLANG_HOME%in; 2.安装下载并安装RabbitMQ 1.下载地址:http://www.rabbitmq.com/download.html 2.环境变量: set RABBITMQ_HOME = D:WorktoolsRabbitMQ Server; set PATH=%Path%;%RABBITMQ_HOME% abbitmq_server-3.8.3sbin; 3.RabbitMQ安装好后接下来安装RabbitMQ-Plugins。 1.打开命令行cd,输入RabbitMQ的sbin目录; 2.rabbitmq-plugins enable rabbitmq_management 3.rabbitmqctl status 验证安装成功 4.启动Rabbit-server rabbitmq-server
3.启动web服务,启动celery
python manage.py runserver 127.0.0.1:6000 python manage.py celery worker -l info # 如果要启动flower #python manage.py celery flower #python manage.py celery flower --basic_auth=username:password
4.在project的settings同级目录创建celery.py文件
# -*- coding:utf-8 -*- from __future__ import absolute_import, unicode_literals import os from celery import Celery, platforms from django.conf import settings import django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'projectname.settings') app = Celery('projectname') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda:settings.INSTALLED_APPS) platforms.C_FORCE_ROOT = True @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
5.在project的settings同级目录__init__.py文件添加:
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
6.在app目录创建tasks.py(必须是这个名字)
# -*- coding:utf-8 -*- from __future__ import absolute_import, unicode_literals from celery import shared_task from project.celery import app # 在需要异步的函数加装饰器@shared_task @shared_task def task_begin(cases): try: cmd = 'ipconfig' res = os.popen(cmd) print(cmd) except Exception as err: log.error("*" * 50 + " test fail " + "*" * 50) log.error(os.path.basename(__file__).replace(".py", "") + 'fail: %s' % str(err))
7.在views添加请求
# -*- coding:utf-8 -*- from django.shortcuts import render from django.http import request, response, HttpRequest, HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_protect, csrf_exempt from django.core import serializers import sys import os import json import requests import multiprocessing from .tasks import * def start(request): try: res = {"code": 200, "msg": "sucess"} if request.method == "POST": # 如果函数有参数,添加在delay里面 task_begin.delay() else: res["code"] = 405 res["msg"] = "method error" return HttpResponse(json.dumps(res), content_type="application/json") except Exception as err: raise Exception(str(err))
8.在settings中配置celery和rabbitmq:
import os from corsheaders import * # from .celeryconfig import * import djcelery djcelery.setup_loader() # celery配置 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERY_RESULT_BACKEND = 'amqp' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_BROKER_URL = 'amqp://rabbitmqguest:guest@localhost:5672/vhost' CELERY_TIMEZONE = 'Asia/Shanghai' # CELERY_REDIS_MAX_CONNECTIONS = 4 # CELERYD_CONCURRENCY = 4 # BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 5} # # BROKER_URL = 'redis://localhost:6379/0' # redis的配置,如果是rabbitmq就不是这个配置 # CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' DEBUG = False