zoukankan      html  css  js  c++  java
  • Celery:进一步探索

    一、创建Celery专用模块

    对于大型项目,一般需要创建一个专用模块,便于管理。

    1.1 模块结构

    proj/__init__.py
        /celery.py
        /tasks.py
    

    proj/celery.py

    from celery import Celery
    
    app = Celery('proj',
                 broker='amqp://',
                 backend='rpc://',
                 include=['proj.tasks'])
    
    app.conf.update(
        result_expires=3600,
    )
    
    # include参数是 worker 进程启动时要导入的模块的列表。需要在此处添加我们的任务模块,以便 workers 能够找到我们的任务。
    

    proj/tasks.py

    from .celery import app
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.task
    def mul(x, y):
        return x * y
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    1.2 启动 worker 进程

    $ celery -A proj worker -l INFO
    
    • Concurrency:是用于同时处理任务的最大工作进程数,默认是CPU核心数,详情见celery worker -c。Celery还支持使用Eventlet,Gevent并在单个线程中运行(请参阅Concurrency
    • task events:是否发送Celery的监控信息,用于如Flower之类的实时监控程序,详见Monitoring and Management guide.
    • queues:worker进程可以拿取任务的队列集合,可以规定worker一次从多个队列中拿取任务,定制更高级的生产者消费者模型时参见Routing GuideWorkers Guide.

    1.3 停止 worker 进程

    当worker进程已经在前台运行时,使用 Control-c 即可停止运行。


    1.4 后台运行 worker 进程

    在生产环境,一般将Celery worker在后台运行,且使用celery multi命令在后台启动一个或多个worker进程。

    启动:

    $ celery multi start w1 -A proj -l INFO
    

    重启:

    $ celery  multi restart w1 -A proj -l INFO
    

    停止:

    # 停止,但不会等待工作程序关闭
    $ celery multi stop w1 -A proj -l INFO
    
    # 停止,并确保在退出之前已完成所有当前正在执行的任务
    $ celery multi stopwait w1 -A proj -l INFO
    

    详情见 daemonization tutorial.


    二、调用 Celery 任务

    可以使用delay()函数调用任务:

    >>> from proj.tasks import add
    >>> add.delay(2, 2)
    

    delay()方法实际上是apply_async()方法的快捷方式:

    >>> add.apply_async((2, 2))
    

    使用apply_async()方法可以指定选项,如运行时间,应发送到的队列等:

    >>> add.apply_async((2, 2), queue='lopri', countdown=10)
    # 任务将被发送到名为的队列中lopri,并且任务将最早在消息发送后10秒钟执行
    

    若直接应用任务将在当前进程中执行任务,不会发送任何消息:

    >>> add(2, 2)
    4
    

    使用上述三种方法组成了 Celery 任务调用的API接口, Calling User Guide 中有更详尽的描述。


    三、查看任务状态

    每个任务调用都将被赋予一个唯一的标识符(UUID)即任务ID:

    >>> res.id
    d6b3aea2-fb9b-4ebc-8da4-848818db9114
    

    result.get()出错时默认情况下会抛出异常,传递propagate参数可以不抛出异常,而是返回一个异常对象

    >>> res.get(propagate=False)
    TypeError("unsupported operand type(s) for +: 'int' and 'str'")
    

    查看任务执行结果:

    >>> res.failed()
    True
    
    >>> res.successful()
    False
    
    >>> res.state
    'FAILURE'
    
    • 关于查看任务状态的详细设置: States
    • 关于执行任务的详细设置: Calling Guide

    四、设计任务工作流

    4.1 函数签名

    是有我们可能希望将一个 “任务调用” 传递给另一个进程,或者作为另一个函数的参数,于是Celery为此使用了一种称为signature的函数。它包装单个“ 任务调用” 的参数和执行选项,这个 signature 可以传递给函数,甚至可以序列化并通过网络发送。

    创建一个任务签名:

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    
    # 快捷方式:
    >>> add.s(2, 2)
    tasks.add(2, 2)
    

    一个签名也是一个任务,也可以调用delayapply_async方法执行,区别在于签名可能已经指定了参数签名,完整的签名可以直接执行:

    >>> s1 = add.s(2, 2)
    >>> res = s1.delay()
    >>> res.get()
    4
    

    也可以创建不完整的签名,在在调用签名时补全其他参数(重复的参数会被新参数替代):

    >>> s2 = add.s(2)
    >>> res = s2.delay(8)
    >>> res.get()
    10
    

    所以,创建一个函数签名到底用来干啥呢?下面要说的 canvas 原语会用到。


    4.2 原语

    所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断。我们执行异步任务时,也可能会遇到这样的业务场景,即一组任务要么全部成功,要么全部失败。 canvas 原语就是用来定义这一组任务的执行,通过多种方式组合它们以构成复杂的工作流程。

    canvas 原语包括以下六种:


    Groups

    一个 group 会并行调用任务列表,返回一个特殊的结果实例,该实例让我们可以将结果作为一个组进行检查,并按顺序检索返回值。

    >>> from celery import group
    >>> from proj.tasks import add
    
    >>> group(add.s(i, i) for i in range(10))().get()
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    

    使用不完整签名函数的group:

    >>> g = group(add.s(i) for i in range(10))
    >>> g(10).get()
    [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    

    Chains

    链式调用,前者的结果作为后者的一个输入:

    >>> from celery import chain
    >>> from proj.tasks import add, mul
    
    # (4 + 4) * 8
    >>> chain(add.s(4, 4) | mul.s(8))().get()
    64
    

    partial chain:

    >>> # (? + 4) * 8
    >>> g = chain(add.s(4) | mul.s(8))
    >>> g(4).get()
    64
    

    可以直接省略chain关键字:

    >>> (add.s(4, 4) | mul.s(8))().get()
    64
    

    Chords

    chords 用于调用一个 group 的结果:

    >>> from celery import chord
    >>> from proj.tasks import add, xsum
    
    >>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
    90
    

    一个连接到 group 的 chain 会自动转化为一个chord:

    >>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
    90
    

    由于原语都是用签名函数,所以可以随意组合,如:

    >>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
    

    有关工作流程的更多信息:Canvas


    五、路由

    Celery支持AMQP协议提供的所有路由功能,也支持简单的路由规则,即:将消息发送的指定的队列。

    task_routes参数可以按名称路由任务,并将所有内容集中在一个位置:

    app.conf.update(
        task_routes = {
            'proj.tasks.add': {'queue': 'hipri'},
        },
    )
    

    然后,可以让 worker进程 在指定的队列中拿取任务:

    $ celery -A proj worker -Q hipri
    

    还可以为 worker进程 指定多个队列,例如让 worker进程 从 hipri队列 和 默认队列 中拿取任务(由于历史原因,celery队列就是默认队列):

    $ celery -A proj worker -Q hipri,celery
    

    队列的顺序无关紧要,因为 worker进程 将给予队列同等的权重,要了解有关路由的更多信息,包括充分利用AMQP路由功能,参阅Routing Guide


    六、远程监控

    如果使用RabbitMQ,Redis或Qpid作为代理,则可以在运行时监控 worker进程。

    例如,可以查看worker进程当前正在执行的任务:

    $ celery -A proj inspect active
    

    这是通过使用广播消息传递实现的,因此群集中的每个 worker 都将接收所有远程控制命令。如果未提供目的地,那么每个 worker 都会响应并回复请求,使用--destination选项指定一个或多个 worker 对请求执行操作,这是 worker 主机名的逗号分隔列表:

    $ celery -A proj inspect active --destination=celery@example.com
    

    其他更多监控命令,参考Monitoring Guide


    七、时区

    Celery内部和消息默认使用UTC时区,当 worker 收到一条消息(例如设置了倒计时)时,它将该UTC时间转换为本地时间。如果希望使用与系统时区不同的时区,则必须使用以下timezone设置进行配置:

    app.conf.timezone = 'Asia/Shanghai'
    

    八、进一步优化

    默认配置未针对吞吐量进行优化,默认情况下,它尝试在许多短期任务和较少的长期任务之间折衷,即吞吐量和公平调度间的折衷。

    如果有严格的公平调度要求,或者要针对吞吐量进行优化,参阅Optimizing Guide

    如果使用RabbitMQ,则可以安装librabbitmq模块,这是用C实现的AMQP客户端:

    $ pip install librabbitmq
    
  • 相关阅读:
    多线程中thread和runnable
    安装hive 个人遇到的问题小问题
    Linux 简单命令学习记录
    shell脚本简单学习教训经验
    @AutoWired使用
    <jsp:directive.page>标签
    Hibernate session.saveOrUpdate()方法
    无法连接远程mysql问题
    svn版本控制
    Hql中占位符(转)
  • 原文地址:https://www.cnblogs.com/bqzzz/p/14508936.html
Copyright © 2011-2022 走看看