zoukankan      html  css  js  c++  java
  • django+xadmin+djcelery实现后台管理定时任务

    继上一篇中间表的数据是动态的,图表展示的数据才比较准确。这里用到一个新的模块Djcelery,安装配置步骤如下:

    1.安装

      redis==2.10.6

      celery==3.1.23

      django-celery==3.1.17

      flower==0.9.2

      supervisor==3.3.4

     flower用于监控定时任务,supervisor管理进程,可选

    2.配置

    settings.py中添加以下几行:

    #最顶头加上
    from __future__ import absolute_import
    
    # celery settings
    import djcelery
    djcelery.setup_loader()
    BROKER_URL = 'redis://localhost:6379'
    # BROKER_URL = 'redis://:密码@主机地址:端口号/数据库号'
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERYD_MAX_TASKS_PER_CHILD = 40
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    INSTALLED_APPS = [
      'djcelery',# 添加djcelery
    ]

     3.注册定时任务的几个表

    from __future__ import absolute_import, unicode_literals
    from djcelery.models import (
        TaskState, WorkerState,
        PeriodicTask, IntervalSchedule, CrontabSchedule,
    )
    from xadmin.sites import site
    site.register(IntervalSchedule) # 存储循环任务设置的时间
    site.register(CrontabSchedule) # 存储定时任务设置的时间
    site.register(PeriodicTask) # 存储任务
    site.register(TaskState) # 存储任务执行状态
    site.register(WorkerState) # 存储执行任务的worker

    4.主应用下添加celery.py,__init__.py修改如下:

    # __init__.py
    from __future__ import absolute_import
    from .celery import app as celery_app
    
    
    # celery.py
    from __future__ import absolute_import
    
    import os
    from celery import Celery, platforms
    from django.conf import settings
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hermes.settings')
    
    # hermes主应用名
    app = Celery('hermes')
    platforms.C_FORCE_ROOT = True
    
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))

    5.添加任务 应用下添加tasks.py

    from __future__ import absolute_import
    
    from celery import task
    import time
    
    from .channels import Cache_data_to_redis
    
    # 更新指定日期数据到sms_organizationcount
    @task
    def readAndWrite(begin,end):
        begin = str(begin)[:4] + '-' + str(begin)[4:6] + '-' + str(begin)[6:8]
        end = str(end)[:4] + '-' + str(end)[4:6] + '-' + str(end)[6:8]
        i = 0
        begin_time = time.time()
        read = Cache_data_to_redis().connection
        Rcursor = read.cursor()
        query = "SELECT id from sms_organizationcount WHERE alia_date_time between '"
        query += begin
        query += "' and '"
        query += end
        query += "'"
        readSql = "SELECT alia_month_time, alia_date_time, count(*) as total_nums, count(t.`status`=2 or null) as error_nums, name FROM 
                  (select *, DATE_FORMAT(req_time,'%Y-%m') as alia_month_time, DATE_FORMAT(req_time,'%Y-%m-%d') as alia_date_time, 
                  LEFT(body,LOCATE('】',body)) as name from sms_smslog where LOCATE('】',body) >0 
                  and LEFT(body,1)='【' and DATE_FORMAT(req_time,'%Y-%m-%d') between '"
        readSql += begin
        readSql += "' and '"
        readSql += end
        readSql += "')"
        readSql += " as t GROUP BY alia_date_time , name;"
        Rcursor.execute(readSql)
        readResult = Rcursor.fetchall()
        Rcursor.execute(query)
        query_result = Rcursor.fetchall()
        deleteSql = "delete from sms_organizationcount where alia_date_time between '%s' and '%s'" % (begin,end)
        if query_result:
            delete_record = Cache_data_to_redis().connection
            Dcursor = delete_record.cursor()
            Dcursor.execute(deleteSql)
            delete_record.commit()
            delete_record.close()
           for value in readResult:
            write = Cache_data_to_redis().connection
            Wcursor = write.cursor()
            writeSql = "INSERT into sms_organizationcount (alia_month_time, alia_date_time, total_nums, error_nums, `name`) " 
                       " VALUES ('%s', '%s', '%s', '%s', '%s' )" %
                       (value['alia_month_time'], value['alia_date_time'], value['total_nums'], value['error_nums'], value['name'])
            try:
                Wcursor.execute(writeSql)
                i += 1
                write.commit()
            except:
                write.rollback()
            write.close()
        read.close()
        end_time = time.time()
        pass_time = end_time - begin_time
        return i, pass_time

    6.最终效果如下图:

    7.终端启动celery命令:

    # 查看注册的task
    celery -A hermes inspect registered
    # 启动
    python  manage.py celery -A django_celery_demo  worker -B    # django_celery_demo为celery和setting所在文件夹名
    
    #celery_beat起不来
    # 动态的输出启动进程时的输出
    supervisorctl tail programname stdout
    
    
    # flower监控celery
    python manage.py celery flower
    ip:5555
    实践出真知~
  • 相关阅读:
    楷书四大家
    什么叫同学?
    css悬浮在页面顶端
    jq给页面添加覆盖层遮罩的实例
    jQuery实现遮罩层
    jQuery实现的文字逐行向上间歇滚动效果示例
    jquery实现文字由下到上循环滚动的实例代码
    Js/Jquery获取网页屏幕可见区域高度
    本机无法访问虚拟机配置的域名
    thinkphp5+GatewayWorker+Workerman
  • 原文地址:https://www.cnblogs.com/NolaLi/p/9469994.html
Copyright © 2011-2022 走看看