zoukankan      html  css  js  c++  java
  • Django + celery +redis使用

    1.安装包

    pip install celery
    
    pip install django-celery
    
    pip install pymysql

    2.创建一个django项目

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
    - manage.py 

    3.修改__init__.py

    import pymysql
    pymysql.install_as_MySQLdb()

    4.修改settings.py,加celery配置

    # 是否启用celery任务
    IS_USE_CELERY = True
    # 本地开发的 celery 的消息队列(RabbitMQ)信息
    # BROKER_URL_DEV = 'amqp://guest:guest@127.0.0.1:5672/'
    BROKER_URL_DEV = 'redis://127.0.0.1:6379/1'
    # TOCHANGE 调用celery任务的文件路径, List of modules to import when celery starts.
    CELERY_IMPORTS = (
        'celerytest.task',
    )
    # ===============================================================================
    # CELERY 配置
    # ===============================================================================
    if IS_USE_CELERY:
        try:
            import djcelery
            import sys
            INSTALLED_APPS += (
                'djcelery',            # djcelery
            )
            djcelery.setup_loader()
            CELERY_ENABLE_UTC = False
            CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
            if "celery" in sys.argv:
                DEBUG = False
            # celery 的消息队列(redis)信息
            BROKER_URL = os.environ.get('BK_BROKER_URL', BROKER_URL_DEV)
            #BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672/'
           
        except:
            pass

    5.修改settings.py,改数据库配置

    # ===============================================================================
    # 数据库设置, 本地开发数据库设置
    # ===============================================================================
    DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.mysql',  # 默认用mysql
            'NAME': 'celerytest',                 # 数据库名 (默认与APP_ID相同)
            'USER': 'root',                       # 你的数据库user
            'PASSWORD': '123456',                  # 你的数据库password
            'HOST': '127.0.0.1',               # 开发的时候,使用localhost
            'PORT': '3306',                       # 默认3306
        },
    }

    6.修改urls.py,添加链接

    from django.conf.urls import include, url
    from django.contrib import admin
    import test
    urlpatterns = [
        url(r'^admin/', include(admin.site.urls)),
        url(r'^aa/$', test.test),
        url(r'^add/$', test.addtest),
        url(r'^del/$', test.deltest),
        url(r'^tt/$', test.startandstop),
    ]

    7.添加task.py,celery任务

    # -*- coding: utf-8 -*-
    import django     #测试不加如这两行启动worker要报错,而且需要加在最前
    django.setup()
    
    import time
    from celery import task,shared_task
    
    #直接执行
    @task()
    def sayhello():
        print('hello ...')
        time.sleep(2)
        print('world ...')
    
    #定时任务
    @shared_task(name="celerytest")
    def everysay(**kwargs):
        """定时任务调用执行作业"""
        sayadd(**kwargs)
    
    def sayadd(**kwargs):
        print('add1 ...')
        time.sleep(2)
        print(int(kwargs['x'])+int(kwargs['y']))

    8.添加test.py,调用任务

    # -*- coding: utf-8 -*-
    from django.shortcuts import render,HttpResponse
    from task import *
    from djcelery import models as djmodels
    import json
    def getcrontabtime(minute,hour,day_of_month,month_of_year,day_of_week): mycrontab = {} mycrontab['minute'] = minute mycrontab['hour'] = hour mycrontab['day_of_month'] = day_of_month mycrontab['month_of_year'] = month_of_year mycrontab['day_of_week'] = day_of_week return mycrontab def test(request): sayhello.delay()#如果有参数sayhello.delay(x,y,z) return HttpResponse("hello world") def addtest(request): '''添加定时任务''' task, created = djmodels.PeriodicTask.objects.get_or_create(name='123456', task='celerytest') #task对应的是task文件里的函数方法 mycrontab = getcrontabtime(5,'*','*','*','*') crontab = djmodels.CrontabSchedule.objects.filter(**mycrontab).first() if crontab is None: crontab = djmodels.CrontabSchedule.objects.create(**mycrontab) task.crontab = crontab task.enabled = True param_dic = {} param_dic['x'] = 5 param_dic['y'] = 2 # print param_dic # if type(param_dic) == list: # task.args = param_dic if type(param_dic) == dict: task.kwargs = json.dumps(param_dic) task.save() return HttpResponse("添加任务") def deltest(request): '''删除定时任务''' try: djmodels.PeriodicTask.objects.filter(name='123456').delete() except: return HttpResponse("删除定时任务失败") return HttpResponse("删除定时任务成功") def startandstop(request): '''启停定时作业''' status = request.GET.get('status','0') task, created = djmodels.PeriodicTask.objects.get_or_create(name='123456', task='celerytest') if status == '0': task.enabled = False task.save() return HttpResponse("停用成功") elif status == '1': task.enabled = True task.save() return HttpResponse("启用成功") else: return HttpResponse("系统错误")

    9.项目架构

    10.运行前准备

    #配置数据库迁移,生成celery需要的数据表
    python manage.py migrate
    #启动Redis
    sudo redis-server /etc/redis/redis.conf
    #启动worker
    python manage.py celery worker --loglevel=info
    #启动心跳
    python manage.py celery beat --max-interval=10 --loglevel=INFO
    --max-interval=10 :每十秒侦测一次任务 --loglevel=INFO:日志等级是INFO

    11.启动测试

    #启动web
    python manage.py runserver 0.0.0.0:9999
    #直接执行
    http://127.0.0.1:9999/aa/
    #添加定时服务
    http://127.0.0.1:9999/add/
    #删除定时服务
    http://127.0.0.1:9999/add/
    #暂停执行定时服务
    http://127.0.0.1:9999/tt/
    http://127.0.0.1:9999/tt/?status=0
    #启动执行定时服务
    http://127.0.0.1:9999/tt/?status=1

    作者:陈耿聪 —— 夕狱

    出处:https://www.cnblogs.com/CGCong/

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    《Java4Android视频教程》学习笔记(二)
    漫画
    MyEclipse启动时报 Unable to acquire application service. Ensure that the org.eclips
    linux下关闭桌面模式使用命令行模式及其它模式
    nginx 开启fastcgi 可支持php、python、perl等多种语言
    linux下安装ImageMagick和Imagick扩展
    nginx 平滑升级到Tengine并编译concat
    centos 安装详解
    for xml path group by
    泛型对象Lists转xml
  • 原文地址:https://www.cnblogs.com/CGCong/p/10191182.html
Copyright © 2011-2022 走看看