zoukankan      html  css  js  c++  java
  • django使用celery搭配redis配置定时任务

    已经安装环境:

    Python3.6   

    django==2.1.8(用2.2.2需要升级sqlite3) 

    项目名称:ceshiproject   APP名称:ceshi 

    第一步:centos7下首先安装redis程序

    wget http://download.redis.io/releases/redis-5.0.5.tar.gz 或者 到官网https://redis.io/download 查看教程并下载
    tar xzf redis-5.0.5.tar.gz
    yum install gcc  gcc-c++  #安装一下依赖,如果有省略
    cd redis-5.0.5
    make MALLOC=libc
    cd src && make install
    ---------到此安装完成---------------
    cd redis-5.0.5  #进入安装目录
    ./redis-server  #启动redis
    ./redis-cli     #可进入redis命令模式,一般用不上

     第二步:安装django-celery==3.2.2,并在APP中添加加 'djcelery' (注意:如果超过3.2.2版本程序会报错:TypeError: can only concatenate list (not "tuple") to list)

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'ceshi',
        'djcelery'
    ]

     第三步:在项目ceshiproject下,新建配置文件celeryconfig.py,内容如下

    # -*- coding:utf-8 -*-
    import djcelery
    djcelery.setup_loader()
    from datetime import timedelta
    
    #设置任务运行时,使用的队列,以防造成定时,跟定期拥堵
    CELERY_QUEUES = {
        'beat_task1':{
            'exchange':'beat_task1',
            'exchange_type':'direct',
            'binding_key':'beat_task1'
        },
        'work_queue':{
            'exchange':'work_queue',
            'exchange_type':'direct',
            'binding_key':'work_queue'
        }
    }
    
    #设置默认使用的队列
    CELERY_DEFAULT_QUEUE = 'work_queue'
    
    
    #导入任务
    CELERY_IMPORTS=(
        'ceshi.task1',
    )
    
    #有些情况下防止死锁
    CELERY_FORCE_EXECV = True
    
    #设置并发的worker数量,根据cpu核心数量来定
    CELERY_CONCURRENCY = 4
    
    #任务失败,可以允许重试
    CELERY_ACKS_LATE = True
    
    #设置每一个worker可以执行多少个任务后销毁,防止内存泄漏
    CELERY_MAX_TASKS_PER_CHILD = 100
    
    #单个任务最大的运行时间,这边设置6分钟
    CELERY_TASK_TIME_LIMIT = 12 * 30
    
    
    
    # 定时任务
    CELERYBEAT_SCHEDULE = {
        'task1': {
            'task': 'ceshi_task',  # 任务名称
            'schedule': timedelta(seconds=10),  # 时间周期
            'options': {
                'queue': 'beat_task1'  # 消息队列
            }
        },
        # 'task2': {
        #     'task': 'update-db',  # 任务名称
        #     # 'schedule': crontab(minute=0, hour=0),  # 每天凌晨执行一次
        #     'schedule': timedelta(seconds=10),  #
        #     'options': {
        #         'queue': 'beat_tasks'
        #     }
        # }
    }

    第四步:在APP下面新建任务task1.py,内容如下:

    from celery.task import Task  #调用celery中的task模块
    import time
    class ceshitask(Task):
        name = "ceshi_task"
        def run(self, *args, **kwargs):
            print ("start ceshi task")
            time.sleep(5)
            print ("args={},kwargs={}".format(args,kwargs))
            print ("end task")

    第五步:配置好项目跟APP的路由

    项目ceshiproject路由:

    from django.contrib import admin
    from django.urls import path,include
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('', include('ceshi.urls',namespace='ceshi')),
    ]

    APP(ceshi)的路由

    from django.urls import path
    from . import views
    app_name="ceshi"
    urlpatterns = [
        path('do/', views.do,name='do'),
    ]

    第六步:

    首先安装redis驱动:pip install redis==2.10.6 

    redis==2.10.6 (高版本会报错,用于django内部驱动用跟下面安装程序是不一样)

    在setting.py中配置redis调用,引入项目下面的celerycofig.py

    from .celeryconfig import *
    BROKER_BACKEND = 'redis'
    BROKER_URL = 'redis://localhost:6379/1'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

    第七步:在views.py中引入APP下面的task1,并调用

    from django.http import JsonResponse
    from ceshi.task1 import ceshitask
    
    def do(request):
        #执行异步任务
        print ('start request')
        ceshitask.delay()
        print ('end request')
        return JsonResponse({"result":"ok"})

    第八步:开启两个终端,并在终端中(Terminal)启动celery中的worker跟定时任务beat模块,进行测试

    python manage.py celery worker -l INFO   #首先开启worker
    python manage.py celerybeat -l INFO   #开启定时任务

     第九步:上面可以使用了,如果要在django中调用,进行一序列的操作,请继续往下走:

    1.把django中cache用于redis

    首先安装django-redis==4.10

    2.把下面的内容添加到django的settings.py中(redis默认不能超过16,如果

    redis://127.0.0.1:6379/10 设置为17会出现:redis.exceptions.ResponseError: DB index is out of range

    CACHES = {
        "default": {
            "BACKEND": "django_redis.cache.RedisCache",
            # 把这里缓存你的redis服务器ip和port
            "LOCATION": "redis://127.0.0.1:6379/10",
            "OPTIONS": {
                "CLIENT_CLASS": "django_redis.client.DefaultClient",
            }
        }
    }
    
    # 3.设置redis存储django的session信息
    SESSION_ENGINE = "django.contrib.sessions.backends.cache"
    SESSION_CACHE_ALIAS = "default"

    3.可以在task任务,通过下面命令设置redis,(注意:设置cache时候参数加None,即cache永久不失效)

    from django.core.cache import cache
    class
    ceshitask(Task): name = "ceshi_task" def run(self, *args, **kwargs): cache.set("start-ceshi-task",值, None) return "ceshi....."

    4.在views中调用

    from django.shortcuts import render,HttpResponse
    from django.core.cache import cache
    import json
    
    def num_recode(request):
        count = cache.get("start-ceshi-task")
        print(json.loads(count))
        return HttpResponse(count)
  • 相关阅读:
    两个list,并集、交集、差集
    关于apt-get install
    异步进程池
    获取cpu信息
    print重定向
    使用pandas操作excel
    排列组合,取所有可能性
    list_反转,切片,删除,升序降序
    抠图
    enumerate()函数的用法
  • 原文地址:https://www.cnblogs.com/weilaibuxiangshuo/p/11010294.html
Copyright © 2011-2022 走看看