zoukankan      html  css  js  c++  java
  • Django与Celery配合实现定时任务

    一、前言

      Celery是一个基于python开发的分布式任务队列,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。

    ​ 另一方面,当我们需要处理一些定时任务时Celery强大的生态环境也是他的优势。

    ​ 在刚刚学习如何使用Celery时可能会觉得难以入手,我利用工作之余研究这些文档前前后后也花了快一个周的样子,目前已经可以满足我的需求,所以静下心多写多测试,加油。

    Ft8aX4.png

    二、配置使用

      celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。

    本示例使用主要依赖包如下:

    celery==4.2.1
    Django==1.11.7
    django-celery-beat==1.4.0
    django-celery-results==1.0.4
    PyMySQL==0.9.2
    redis==2.10.6
    

    配置

      新建立项目celery_demo,目录结构(每个app下多了个tasks文件,用于定义任务):

    celery_demo
    ├── app01
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py
    │   └── views.py
    ├── manage.py
    ├── celery_demo
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    └── templates
    

    在项目目录celery_demo/celery_demo/目录下新建celery.py:

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    # Author:wd
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # 设置django环境
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_demo.settings')  
    app = Celery('celery_demo')
    #  使用CELERY_ 作为前缀,在settings中写配置
    app.config_from_object('django.conf:settings', namespace='CELERY') 
    # 发现任务文件每个app下的task.py
    app.autodiscover_tasks()  
    

    celery_demo/celery_demo/__init__.py写入:

    from __future__ import absolute_import, unicode_literals
    from .celery import app as celery_app
    __all__ = ['celery_app']
    

    celery_demo/celery_demo/settings.py写入:

    CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件
    
    CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
    

    进入项目的celery_demo目录启动worker

    celery worker -A taskproj -l debug
    

    定义与触发任务

    任务定义在每个tasks文件中,app01/tasks.py

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    

    视图中触发任务

    from django.http import JsonResponse
    from app01 import tasks
    
    # Create your views here.
    
    def index(request,*args,**kwargs):
        res=tasks.add.delay(1,3)
        #任务逻辑
        return JsonResponse({'status':'successful','task_id':res.task_id})
    

    访问http://127.0.0.1:8000/index

    img

    若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取:

    img

    扩展

      除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。

    1. 安装
    pip install django-celery-results
    
    1. 配置settings.py,注册app
    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    
    1. 修改backend配置,将Redis改为django-db
    #CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis
    
    CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储
    
    1. 修改数据库
    python3 manage.py migrate django_celery_results
    

    此时会看到数据库会多创建:

    img 当然你有时候需要对task表进行操作,以下源码的表结构定义:

    class TaskResult(models.Model):
        """Task result/status."""
    
        task_id = models.CharField(_('task id'), max_length=255, unique=True)
        task_name = models.CharField(_('task name'), null=True, max_length=255)
        task_args = models.TextField(_('task arguments'), null=True)
        task_kwargs = models.TextField(_('task kwargs'), null=True)
        status = models.CharField(_('state'), max_length=50,
                                  default=states.PENDING,
                                  choices=TASK_STATE_CHOICES
                                  )
        content_type = models.CharField(_('content type'), max_length=128)
        content_encoding = models.CharField(_('content encoding'), max_length=64)
        result = models.TextField(null=True, default=None, editable=False)
        date_done = models.DateTimeField(_('done at'), auto_now=True)
        traceback = models.TextField(_('traceback'), blank=True, null=True)
        hidden = models.BooleanField(editable=False, default=False, db_index=True)
        meta = models.TextField(null=True, default=None, editable=False)
    
        objects = managers.TaskResultManager()
    
        class Meta:
            """Table information."""
    
            ordering = ['-date_done']
    
            verbose_name = _('task result')
            verbose_name_plural = _('task results')
    
        def as_dict(self):
            return {
                'task_id': self.task_id,
                'task_name': self.task_name,
                'task_args': self.task_args,
                'task_kwargs': self.task_kwargs,
                'status': self.status,
                'result': self.result,
                'date_done': self.date_done,
                'traceback': self.traceback,
                'meta': self.meta,
            }
    
        def __str__(self):
            return '<Task: {0.task_id} ({0.status})>'.format(self)
    

    三、Django中使用定时任务

      如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。

    安装配置

    1.beat插件安装

    pip3 install django-celery-beat
    

    2.注册APP

    INSTALLED_APPS = [
        ....   
        'django_celery_beat',
    ]
    

    3.数据库变更

    python3 manage.py migrate django_celery_beat
    

    4.分别启动woker和beta

    celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库
    
    celery worker -A taskproj -l info #启动woker
    

    5.配置admin

    urls.py写入:

    # urls.py
    from django.conf.urls import url
    from django.contrib import admin
     
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
    ]
    

    6.创建用户

    python3 manage.py createsuperuser 
    

    7.登录admin进行管理(地址http://127.0.0.1:8000/admin)并且还可以看到我们上次使用orm作为结果存储的表。

    http://127.0.0.1:8000/admin/login/?next=/admin/

    img

    使用示例:

    img

    img

    查看结果:

    img

    二次开发

      django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:

    settings.py

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'app01.apps.App01Config',
        'django_celery_results',
        'django_celery_beat',
        'rest_framework',
    ]
    

    urls.py

    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^index$', views.index),
        url(r'^res$', views.get_res),
        url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
    ]
    

    views.py

    from django_celery_beat.models import PeriodicTask  #倒入插件model
    from rest_framework import serializers
    from rest_framework import pagination
    from rest_framework.viewsets import ModelViewSet
    class Userserializer(serializers.ModelSerializer):
        class Meta:
            model = PeriodicTask
            fields = '__all__'
    
    class Mypagination(pagination.PageNumberPagination):
        """自定义分页"""
        page_size=2
        page_query_param = 'p'
        page_size_query_param='size'
        max_page_size=4
    
    class TaskView(ModelViewSet):
        queryset = PeriodicTask.objects.all()
        serializer_class = Userserializer
        permission_classes = []
        pagination_class = Mypagination
    

    访问http://127.0.0.1:8000/tasks如下:

    img

    参考资料:

    W-D:https://www.cnblogs.com/wdliu/p/9530219.html

    还在琢磨:https://blog.csdn.net/mbl114/article/details/78047175

    Celery文档:http://docs.celeryproject.org/en/latest/

  • 相关阅读:
    Atitit 华为基本法 attilax读后感
    Atitit 华为管理者内训书系 以奋斗者为本 华为公司人力资源管理纲要 attilax读后感
    Atitit 项目版本管理gitflow 与 Forking的对比与使用
    Atitit 管理的模式扁平化管理 金字塔 直线型管理 垂直管理 水平管理 矩阵式管理 网状式样管理 多头管理 双头管理
    Atitit 乌合之众读后感attilax总结 与读后感结构规范总结
    深入理解 JavaScript 异步系列(4)—— Generator
    深入理解 JavaScript 异步系列(3)—— ES6 中的 Promise
    深入理解 JavaScript 异步系列(2)—— jquery的解决方案
    深入理解 JavaScript 异步系列(1)——基础
    使用 github + jekyll 搭建个人博客
  • 原文地址:https://www.cnblogs.com/huang-yc/p/10110754.html
Copyright © 2011-2022 走看看