zoukankan      html  css  js  c++  java
  • Celery分布式任务队列的认识和基本操作

    一、简单认识

      Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。

    它的特点有:

    • 简单:熟悉了它的流程后,配置使用简单;
    • 高可用:任务执行失败或执行过程中发生连接中断,Celery会自动重新执行任务;
    • 快速:一个单进程的Celery每分钟可处理上百万个任务;
    • 灵活:Celery的各个组件都可以被扩展及自定制;  

    应用场景举例:

      1.web应用:用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站并发及用户体验感。

      2.任务场景:需要批量在几百台机器执行某些命令或者任务,Celery可以轻松搞定。

      3.定时任务:向定时导数据报表、定时发送通知类似场景,Celery可以提供管理接口和丰富的API。

    二、架构和工作原理

    Celery由以下三部分构成:消息中间件(Broker)、任务执行单元(Worker)、结果存储(Backend):来个图

    消息中间件(Broker):

      消息中间件Broker支持RabbitMQ、Redis、MongoDB、Memcached 等,官方推荐RabbitMQ。

    任务执行单元(Worker)

      Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。

    结果存储(Backend)

      Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。

    工作原理

      任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;

      任务执行单元Worker实时监视消息队列获取队列中的任务执行;

      Woker执行完任务后将结果保存在Backend中;

    三、安装使用

    Redis的安装:

      参考我的另外一篇:https://www.cnblogs.com/Utopia-Clint/p/10868258.html

    Celery的安装:

    一个简单的应用(Linux环境下)

    注意:此时并没有将配置文件、任务文件及初始化文件分开,真的到应用环境中是要分开的,后面会提到;

    创建一个文件目录:

    mkdir /root/celery_study

    在celery_study创建文件task.py

     

    task.py:任务定义文件

    # -*- coding:utf-8 -*-
    # @Author  : Clint
    from celery import Celery
    app = Celery('task',
                 broker='redis://:123456@localhost:6379',
                 backend='redis://:123456@localhost:6379',
                 )
    
    
    @app.task
    def add(x, y):
        print("running...", x, y)
        return x + y

     启动Worker

    celery -A task worker  --loglevel=info

    各个参数含义:

      worker: 代表第启动的角色是work当然还有beat等其他角色;

      -A :项目路径,这里我的目录是task;

      -loglevel:启动的日志级别,有info、debug等,更多参数使用celery --help查看

    任务队列已经准备就绪;

    我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果:

    AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:

    • state: 返回任务状态;
    • task_id: 返回任务id;
    • result: 返回任务结果,同get()方法;
    • ready(): 判断任务是否以及有结果,有结果为True,否则False;
    • info(): 获取任务信息,默认为结果;
    • wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
    • successfu(): 判断任务是否成功,成功为True,否则为False;
  • 相关阅读:
    【转】简单地用nfs实现linux间文件共享
    CentOS 6.1上部署SVN时遇到的问题及解决方法
    ubuntu下部署SVN服务
    LINUX GBK>UTF8文件编码批量转换脚本[转]
    Thinkpad E420 3YC(双显卡)安装64位Ubuntu 11.10笔记
    拦截器详解
    快排的算法
    冒泡排序的算法
    struts2的输入校验
    ssh整合开发
  • 原文地址:https://www.cnblogs.com/Utopia-Clint/p/10861899.html
Copyright © 2011-2022 走看看