zoukankan      html  css  js  c++  java
  • Django 学习之Celery(芹菜)

    Celery 介绍

    文档:http://docs.celeryproject.org/en/latest/index.html
    Celery 是一个功能完备,即插即用的异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务的执行。
    任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(RabbitMQ, Redis等)

    Celery 特点

    简单,易于使用和维护,有丰富的文档
    高效,单个Celery 进程 每分钟可以处理数百万个任务
    灵活,Celery 种几乎每个部分都可以自定义扩展
    Celery 非常易于集成到一些Web开发框架中

    Celery角色

    Celery clinet:这是任务生产者,它负责将任务(tasks)发送到broker中
    Broker:broker 负责将任务分发给响应的Celery worker
    Celery worker:这是任务的执行者,完成相应的业务逻辑,在具体实现上体现为Python 函数
    实现过程
    Celery 通过消息进行通信,通常使用一个叫broker(中间人)来协作client(任务的发出者)和worker(任务的处理者),client 发出消息到队列中,broker将队列中的消息发给worker 来处理
    一个Celery系统可以包含很多的worker和broker,可增强横向扩展和高可用性能

    Celery组成结构

    Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成
    1.消息中间件:Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,官方推荐RabbitMQ和Redis作为消息队列
    2.任务执行单元:Worker 是Celery提供的任务执行的单元,worker 并发的运行在分布式的系统节点中
    3.任务结果存储:Task result store 用来存储worker 执行的任务的结果,Celery 支持以不同方式存储任务的结果 包括AMQP, Redis,memcached,MongoDB,SQLAlchemy,Django ORM等等

    Celery 适应场景

    Celery 适用异步处理问题,比如发送邮件、文件上传、图像处理等比较耗时的操作,我们可以将其加入到任务队列去异步执行,这样用户不需要等待很久,提高用户体验。
    1.异步任务处理:例如给注册用户发送短信或者确认邮件的任务
    2.大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长
    3.定时执行的任务:支持任务的定时执行和设定时间执行,例如性能压测定时执行。

    Celery 的实现原理

    Celery基本实现原理是amqp协议的一个实现,比较耗时,耗资源的操作通过amqp协议发送到远端,让远端去处理它,在url进来,通过view返回的时候,在前端能够得到很快的响应,任务都在后台处理掉,提高用户的体验度,能够跟框架结合,执行一些可能不是立马需要结果的任务,这就是大家都用的异步任务。

    任务队列中包含任务的工作单元,有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理,是一种跨线程、跨机器工作的一种机制。

    Celery 的在Django中的实际使用(Django1.11.11, Celery4.0.2)

    1.在项目下创建具有标识Celery的Python包(脚本名),比如:celery_tasks

    2.在celery_tasks 包中创建main.py,用于作为Celery的启动文件
      1)为Celery使用Django配置文件
      import os

      if not os.getenv('DJANGO_SETTINGS_MODULE'):
        os.environ['DJANGO_SETTINGS_MODULE'] = '项目名.settings'
      2)创建Celery 对象
      from celery import Celery

      app = Celery('celery_tasks') # 一般设置main的参数为脚本名
      3)加载配置文件
      app.config_from_object('celery_tasks.config')
      4)设置自动加载任务的任务名
      app.autodiscover_tasks(['celery_tasks.xxx']) # xxx为celery_tasks 下的任务名

    3.在celery_tasks包中创建config.py 文件,用于保存Celery 的配置信息(推荐使用方法一RabbitMQ)
      1)RabbitMQ
        RabbitMQ是Celery的默认代理,因此除了要使用的代理实例的URL位置之外,它不需要任何其他依赖项或初始配置
        broker_url = 'amqp://myuser: mypassword@localhost:5672/myvhost'
        # 也可以在实例Celery 对象的时候指定 如:
        # app = Celery('celery_tasks',
        #       broker='amqp://myuser: mypassword@localhost:5672/myvhost',
        #       backend='rpc://'
        # )
        # backend 参数标识celery worker 执行完的结果需要保存,rpc 表示通过RPC(Remote Procedure Call)模式被送到RabbitMQ,

        # 如果不指定backend 参数,任务结果将被丢弃

       2)Redis
        broker_url = 'redis://127.0.0.1/14'
        result_backend = 'redis://127.0.0.1/15'
        # 也可以在实例Celery 对象的时候指定 如:
        # app = Celery('celery_tasks',
        #       broker='redis://127.0.0.1/14',
        #       backend='redis://127.0.0.1/15'
        # )

    4.创建任务,并让Celery 检测 (检测就是执行‘2.4)’)

    如:发送短信
    from libs.yuntongxun.sms import CCP
    from celery_tasks.main import app

    # 可以设置name参数,不设置任务名默认是该函数名的路径(如:.celery_tasks.sms.send_sms_code)
    # app.task装饰符告诉Celery 这个函数并不在celery client 端执行,当他们被调用时只将调用信息通过# brocker 发送给celery workers 执行

    @app.task(name='send_sms_code')
    def send_sms_code(mobile, sms_code):
      ccp = CCP()
      ccp.send_template_sms(mobile, [sms_code, 5], 1)

    5.调用Celery任务

    from celery_tasks.sms.tasks
      1)delay方法,比如:
        tasks.send_sms_code.delay(mobile, sms_code)
      2)apply_async方法,比如:
        # tasks.send_sms_code.apply_async((mobile, sms_code))
        tasks.send_sms_code.apply_async((mobile, sms_code), queue='xxx')
    由此可以看出,delay方法是apply_async 简化版本,但是apply_async方法可以带非常多的配置参数,包括指定队列等,queue指定队列名称,把不同任务分配不同的队列(推荐使用delay方法)

    6.执行命令,启动worker 让Celery 单独执行
    celery -A Celery实例对象路径 worker -l info 

    celery -A celery_tasks.main worker -l info     或    celery -A celery_tasks.main worker  --loglevel=info

    7.任务的状态

    PENDING : 等待
    STARTED : 开始
    RETRY: 重试
    FAILURE: 失败
    SUCCESS : 成功
    如:result = tasks.send_sms_code.delay(mobile, sms_code)
    通过result.state 或result.status来查看任务的状态
    更多 result 方法  查看http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result

  • 相关阅读:
    寒假了
    【MFC】浏览器中快速打开常用工具
    【转】MFC隐藏进程自身(任务管理器不可见,wSysCheck等工具可见)
    【原】DIY属于自己的鼠标侧键
    coco2dx 3.4final 使用scale9sprite
    linux挂载新硬盘
    关于c语言中的结构体使用偏移量求值问题
    Linux的网卡由eth0变成了eth1,如何修复
    oracle归档管理
    exsi上虚拟因硬盘不足无法启动
  • 原文地址:https://www.cnblogs.com/yungiu/p/10211836.html
Copyright © 2011-2022 走看看