zoukankan      html  css  js  c++  java
  • Django使用Celery异步任务队列

    1  Celery简介

    Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。

    任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ、Redis)。

    1.1  Celery原理

     

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    • 消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推荐使用:RabbitMQ、Redis作为消息队列。
    • 任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
    • 任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

    1.2     Celery适用场景

    • 异步任务处理:例如给注册用户发送短消息或者确认邮件任务。
    • 大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长。
    • 定时执行的任务:支持任务的定时执行和设定时间执行。例如性能压测定时执行。

    2      Celery开发环境准备

    2.1     环境准备

    软件名称

    版本号

    说明

    Linux

    Centos 6.5(64bit)

    操作系统

    Python

    3.5.2

    Django

    1.10

    Web框架

    Celery

    4.0.2

    异步任务队列

    Redis

    2.4

    消息队列

    2.2     Celery安装

    使用方法介绍:

    Celery的运行依赖消息队列,使用时需要安装redis或者rabbit。

    这里我们使用Redis。安装redis库:

    sudo yum install redis
    

      

    启动redis:

    sudo service redis start
    

    安装celery库

    sudo pip install celery==4.0.2

    3      Celery单独执行任务

    3.1     编写任务

    创建task.py文件

    说明:这里初始Celery实例时就加载了配置,使用的redis作为消息队列和存储任务结果。

     

    运行celery:

    $ celery -A task worker --loglevel=info

    看到下面的打印,说明celery成功运行。

     

    3.2     调用任务

     直接打开python交互命令行

     执行下面代码:

    可以celery的窗口看到任务的执行信息

     

    任务执行状态监控和获取结果:

     

    3.3     任务调用方法总结

    有两种方法:

    delay和apply_async ,delay方法是apply_async简化版。

    add.delay(2, 2)
    add.apply_async((2, 2))
    add.apply_async((2, 2), queue='lopri')

    delay方法是apply_async简化版本。

    apply_async方法是可以带非常多的配置参数,包括指定队列等

    • Queue 指定队列名称,可以把不同任务分配到不同的队列

    3.4     任务状态

    每个任务有三种状态:

    PENDING -> STARTED -> SUCCESS

    任务查询状态:

    res.state

    来查询任务的状态

     

    4      与Django集成

    上面简单介绍了celery异步任务的基本方法,结合我们实际的应用,我们需要与Django一起使用,下面介绍如何与Django结合。

    4.1     与Django集成方法

    与Django集成有两种方法:

    • Django 1.8 以上版本:与Celery 4.0版本集成
    • Django 1.8 以下版本:与Celery3.1版本集成,使用django-celery库

    今天我们介绍celery4.0 和django 1.8以上版本集成方法。

    4.2     创建项目文件

    创建一个项目:名字叫做proj

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
      - proj/wsgi.py
    - manage.py

    创建一个新的文件:proj/proj/mycelery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
     
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
     
    app = Celery('proj')
     
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
     
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()

    在proj/proj/__init__.py:添加

    from __future__ import absolute_import, unicode_literals
     
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .mycelery import app as celery_app
     
    __all__ = ['celery_app']

    4.3     配置Celery

    我们在mycelery.py文件中说明celery的配置文件在settings.py中,并且是以CELERY开头。

       
    app.config_from_object('django.conf:settings', namespace='CELERY')

    在settings.py文件中添加celery配置:

     

    我们的配置是使用redis作为消息队列,消息的代理和结果都是用redis,任务的序列化使用json格式。

    重要:redis://127.0.0.1:6379/0这个说明使用的redis的0号队列,如果有多个celery任务都使用同一个队列,则会造成任务混乱。最好是celery实例单独使用一个队列。

    4.4     创建APP

    创建Django的App,名称为celery_task,在app目录下创建tasks.py文件。

    完成后目录结构为:

    ├── celery_task
    │   ├── admin.py
    │   ├── apps.py
    │   ├── __init__.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py
    │   ├── tests.py
    │   └── views.py
    ├── db.sqlite3
    ├── manage.py
    ├── proj
    │   ├── celery.py
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    └── templates

    4.5     编写task任务

    编辑任务文件

    tasks.py

    在tasks.py文件中添加下面代码

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    @shared_task
    def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)

    启动celery:

    celery -A proj.mycelery worker -l info

    说明:proj为模块名称,mycelery为celery的实例所在的文件。

    启动成功打印:

     

    4.6     在views中调用任务

    在views中编写接口,实现两个功能:

    • 触发任务,然后返回任务的结果和任务ID
    • 根据任务ID查询任务状态

    代码如下:

     

    启动django。

    新开一个会话启动celery;启动命令为:

    celery –A proj.mycelery worker –l info

    访问http://127.0.0.1:8000/add,可以看到返回的结果。

     

    在celery运行的页面,可以看到下面输出:

     

    4.7     在views中查询任务状态

    有的时候任务执行时间较长,需要查询任务是否执行完成,可以根据任务的id来查询任务状态,根据状态进行下一步操作。

    可以看到任务的状态为:SUCCESS

     

    5      Celery定时任务

    Celery作为异步任务队列,我们可以按照我们设置的时间,定时的执行一些任务,例如每日数据库备份,日志转存等。

    Celery的定时任务配置非常简单:

    定时任务的配置依然在setting.py文件中。

    说明:如果觉得celery的数据配置文件和Django的都在setting.py一个文件中不方便,可以分拆出来,只需要在mycelery.py的文件中指明即可。

    app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

    5.1     任务间隔运行

    #每30秒调用task.add
    from datetime import timedelta
    
    CELERY_BEAT_SCHEDULE = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=30),
            'args': (16, 16)
        },
    }

    5.2     定时执行

    定时每天早上7:30分运行。

    注意:设置任务时间时注意时间格式,UTC时间或者本地时间。

    #crontab任务
    #每天7:30调用task.add
    from celery.schedules import crontab
    
    CELERY_BEAT_SCHEDULE = {
        # Executes every Monday morning at 7:30 A.M
        'add-every-monday-morning': {
            'task': 'tasks.add',
            'schedule': crontab(hour=7, minute=30),
            'args': (16, 16),
        },
    }

    5.3     定时任务启动

    配置了定时任务,除了worker进程外,还需要启动一个beat进程。

    Beat进程的作用就相当于一个定时任务,根据配置来执行对应的任务。

    5.3.1  启动beat进程

    命令如下:

    celery -A proj.mycelery beat -l info

     

    5.3.2  启动worker进程

    Worker进程启动和前面启动命令一样。

    celery –A proj.mycelery worker –l info

    6      Celery深入

    Celery任务支持多样的运行模式:

    • 支持动态指定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
    • 支持链式任务
    • 支持Group任务
    • 支持任务不同优先级
    • 支持指定任务队列
    • 支持使用eventlet模式运行worker

    例如:指定并发数为1000

    celery -A proj.mycelery worker -c 1000

    这些可以根据使用的深入自行了解和学习。

     

    7      参考资料

    Celery官网:

    http://docs.celeryproject.org/en/latest/index.html

    Celery与Django:

    http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps

    celery定时任务:

    http://blog.csdn.net/sicofield/article/details/50937338

  • 相关阅读:
    jira启动错误解决:Failed to read artifact descriptor for com.atlassian.plugins.rest:atlassian-rest-doclet:jar:2.9.2:
    jira8.0 api变化--含解决方法
    jira spring scanner注意事項
    pom文件添加aliyun镜像
    jira插件打包时报osgi的错误
    jira 根据项目(project)获取优先级(proirity)
    scriptrunner fragments设置web resource的路径
    Ubuntu16.04 LTS上安装Go1.10
    ubuntu 常见错误--Could not get lock /var/lib/dpkg/lock 问题修改
    VS2015编译OpenSSL
  • 原文地址:https://www.cnblogs.com/StitchSun/p/8552488.html
Copyright © 2011-2022 走看看