zoukankan      html  css  js  c++  java
  • 分布式框架Celery(转)

    一.简介

        Celery是一个异步任务的调度工具。
    
        Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。
    
        在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。
    
        这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。
    我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。 Celery(芹菜)是一个异步任务队列
    /基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。
    它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,
    使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

    二.Celery 的架构

      

      Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。我们需要一个消息队列来下发我们的任务。
    首先要有一个消息中间件,此处选择rabbitmq (也可选择 redis 或 Amazon Simple Queue Service(SQS)消息队列服务)。推荐 选择 rabbitmq 。使用RabbitMQ是官方特别推荐的方式。它的架构组成如下图:

    Celery_framework

    可以看到,Celery 主要包含以下几个模块:

    • 任务模块 Task

      包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    • 消息中间件 Broker

      Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。

    各个中间件的比较:

      

    三.安装及初步使用

    pip install celery #安装celery 

      小试牛刀:1. 定义任务函数。2. 运行celery服务。3. 客户应用程序的调用。

      编写一个tasks.py的文件

    # encoding: utf-8
    from celery import Celery
    #任务调度名称,指定中间件和结果存储的位置
    app = Celery('tasks',broker="redis://127.0.0.1:6379/0",backend='redis://127.0.0.1:6379/1')
    
    @app.task
    def add(x,y):
        return x + y

      上述代码导入了celery,然后创建了celery 实例 app,实例化的过程中指定了任务名tasks(和文件名一致),传入了broker和backend。然后创建了一个任务函数add

      运行:

    celery -A task worker -l info -P eventlet

      这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别,-P这个参数是celery4.x之后要使用的,否则会报错。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,

    worker界面展示信息如下:

      

      如果你不想使用 celery 命令来启动 worker,可直接使用文件来驱动,修改task.py (增加入口函数main)

    if __name__ == '__main__':
        app.start()

      再执行

    python task.py worker

       终端调用(需要在tasks.py同级目录下运行):

    D:	omcelery
    λ python
    Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from tasks import add
    >>> r=add.delay(3,4)
    >>> print(r.get())
    7
    >>> print(r.result)
    7
    >>> print(r.ready())

      到redis看一下结果:

    另外一种方法:

      启用任务和的调度都使用脚本 

      功能:模拟一个耗时操作,并打印 worker 所在机器的 IP 地址,中间人和结果存储都使用 redis 数据库

    #encoding=utf-8
    #filename my_first_celery.py
    from celery import Celery
    import time
    import socket
    
    app = Celery('tasks', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' )
    
    def get_host_ip():
        """
        查询本机ip地址
        :return: ip
        """
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('8.8.8.8', 80))
            ip = s.getsockname()[0]
        finally:
            s.close()
        return ip
    
    
    @app.task
    def add(x, y):
        time.sleep(3) # 模拟耗时操作
        s = x + y
        print("主机IP {}: x + y = {}".format(get_host_ip(),s))
        return s 

      启动这个 worker:

    celery -A my_first_celery worker -l info -P eventlet

      这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别。可以使用 celery –help 命令来查看celery命令的帮助文档。执行命令后,worker界面展示信息如下:

      

      调用任务:

      在 my_first_celery.py 的同级目录下编写如下脚本 start_task.py如下。

      

    #encoding=utf-8
    from my_first_celery import add #导入我们的任务函数add
    import time
    result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行
    
    while not result.ready():# 循环检查任务是否执行完毕
        print(time.strftime("%H:%M:%S"))
        time.sleep(1)
    
    print(result.get()) #获取任务的返回结果
    print(result.successful()) #判断任务是否成功执行

      执行

    python start_task.py

      结果如下所示:

      

      发现等待了大约3秒钟后,任务返回了结果24,并且是成功完成,此时worker界面增加的信息如下:

      

      这里的信息非常详细,其中2004447e-1183-4d65-ae5d-bd8ead70e216是taskid,只要指定了 backend,根据这个 taskid 可以随时去 backend 去查找运行结果,使用方法如下:

    >>> from my_first_celery import add
    >>> taskid='2004447e-1183-4d65-ae5d-bd8ead70e216'
    >>> add.AsyncResult(taskid).get()
    24
    >>> 
    
    
    #或者
    >>> from celery.result import AsyncResult
    >>> AsyncResult(taskid).get()
    24

      重要说明:如果想远程执行 worker 机器上的作业,请将 my_first_celery.py 和 start_tasks.py 复制到远程主机上(需要安装
    celery),修改 my_first_celery.py 指向同一个中间人和结果存储,再执行 start_tasks.py 即可远程执行 worker 机器上的作业。my_first_celery.add函数的代码不是必须的,你也要以这样调用任务:

    from my_first_celery import app
    app.send_task("my_first_celery.add",args=(1,3))

    四.第一个 celery 项目

      在生产环境中往往有大量的任务需要调度,单独一个文件是不方便的,celery 当然支持模块化的结构,我这里写了一个用于学习的 Celery 小型工程项目,含有队列操作,任务调度等实用操作,目录树如下所示:

     
        
      其中 init.py是空文件,目的是告诉 Python myCeleryProj 是一个可导入的包.
      app.py
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    #author tom
    
    from celery import Celery
    
    app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])
    
    app.config_from_object("myCeleryProj.settings")
    
    if __name__ == "__main__":
        app.start()

      settings.py

    #!/usr/bin/env python
    # -*- codin
    # g: utf-8 -*-
    #author tom
    from kombu import Queue
    import re
    from datetime import timedelta
    from celery.schedules import crontab
    
    
    CELERY_QUEUES = (  # 定义任务队列
        Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
        Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
        Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
    )
    
    CELERY_TASK_DEFAULT_QUEUE = "default"  # 设置默认队列名为 default
    CELERY_TASK_DEFAULT_EXCHANGE = "tasks"
    CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
    CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"
    
    CELERY_ROUTES = (
        [
            (
                re.compile(r"myCeleryProj.tasks.(taskA|taskB)"),
                {"queue": "tasks_A", "routing_key": "A.import"},
            ),  # 将tasks模块中的taskA,taskB分配至队列 tasks_A ,支持正则表达式
            (
                "myCeleryProj.tasks.add",
                {"queue": "default", "routing_key": "task.default"},
            ),  # 将tasks模块中的add任务分配至队列 default
        ],
    )
    
    
    # CELERY_ROUTES = (
    #    [
    #        ("myCeleryProj.tasks.*", {"queue": "default"}), # 将tasks模块中的所有任务分配至队列 default
    #    ],
    # )
    
    # CELERY_ROUTES = (
    #    [
    #        ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
    #        ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
    #        ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
    #    ],
    # )
    
    BROKER_URL = "redis://127.0.0.1:6379/0"  # 使用redis 作为消息代理
    
    CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"  # 任务结果存在Redis
    
    CELERY_RESULT_SERIALIZER = "json"  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
    
    
    CELERYBEAT_SCHEDULE = {
        "add": {
            "task": "myCeleryProj.tasks.add",
            "schedule": timedelta(seconds=10),
            "args": (10, 16),
        },
        "taskA": {
            "task": "myCeleryProj.tasks.taskA",
            "schedule": crontab(hour=21, minute=10),
        },
        "taskB": {
            "task": "myCeleryProj.tasks.taskB",
            "schedule": crontab(hour=21, minute=12),
        },
    }

      readme.txt

    #启动 worker 
    #分别在三个终端窗口启动三个队列的worker,执行命令如下所示:
    celery -A myCeleryProj.app worker -Q default -l info
    celery -A myCeleryProj.app worker -Q tasks_A -l info
    celery -A myCeleryProj.app worker -Q tasks_B -l info
    #当然也可以一次启动多个队列,如下则表示一次启动两个队列tasks_A,tasks_B。
    celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info
    #则表示一次启动两个队列tasks_A,tasks_B。
    #最后我们再开启一个窗口来调用task: 注意观察worker界面的输出
    >>> from myCeleryProj.tasks import *
    >>> add.delay(4,5);taskA.delay();taskB.delay() #同时发起三个任务
    <AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474>
    <AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6>
    <AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab>
    #也可以使用下面的方法调用task
    >>> from myCeleryProj.app import app
    >>> app.send_task(myCeleryProj.tasks.add,args=(4,5)
    >>> app.send_task(myCeleryProj.tasks.taskA)
    >>> app.send_task(myCeleryProj.tasks.taskB)

    转自:https://blog.csdn.net/somezz/article/details/82343346

    五.管理与监控

      Celery管理和监控功能是通过flower组件实现的,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理。

      安装使用

      

    pip3 install flower

      

      启动

     flower -A project --port=5555   
    # -A :项目目录
    #--port 指定端口

      访问http:ip:5555

        

        api使用,例如获取woker信息  

        

    curl http://127.0.0.1:5555/api/workers

        结果:

        

      更多api参考:https://flower.readthedocs.io/en/latest/api.html

  • 相关阅读:
    Web开发细节搜集
    excel设置单元格为文本
    网页QQ唤起
    .net提高文章
    代码重构学习
    js的undefined怎么判断
    微软.net一些类的源码
    FineMessBox的js依赖导致错误Uncaught ReferenceError: addEvent is not defined
    [译转]深入理解LayoutInflater.inflate()
    java 和 Android Base64加密
  • 原文地址:https://www.cnblogs.com/tjp40922/p/11345525.html
Copyright © 2011-2022 走看看