zoukankan      html  css  js  c++  java
  • Celery:进一步探索

    一、创建Celery专用模块

    对于大型项目,一般需要创建一个专用模块,便于管理。

    1.1 模块结构

    proj/__init__.py
        /celery.py
        /tasks.py
    

    proj/celery.py

    from celery import Celery
    
    app = Celery('proj',
                 broker='amqp://',
                 backend='rpc://',
                 include=['proj.tasks'])
    
    app.conf.update(
        result_expires=3600,
    )
    
    # include参数是 worker 进程启动时要导入的模块的列表。需要在此处添加我们的任务模块,以便 workers 能够找到我们的任务。
    

    proj/tasks.py

    from .celery import app
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.task
    def mul(x, y):
        return x * y
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    

    1.2 启动 worker 进程

    $ celery -A proj worker -l INFO
    
    • Concurrency:是用于同时处理任务的最大工作进程数,默认是CPU核心数,详情见celery worker -c。Celery还支持使用Eventlet,Gevent并在单个线程中运行(请参阅Concurrency
    • task events:是否发送Celery的监控信息,用于如Flower之类的实时监控程序,详见Monitoring and Management guide.
    • queues:worker进程可以拿取任务的队列集合,可以规定worker一次从多个队列中拿取任务,定制更高级的生产者消费者模型时参见Routing GuideWorkers Guide.

    1.3 停止 worker 进程

    当worker进程已经在前台运行时,使用 Control-c 即可停止运行。


    1.4 后台运行 worker 进程

    在生产环境,一般将Celery worker在后台运行,且使用celery multi命令在后台启动一个或多个worker进程。

    启动:

    $ celery multi start w1 -A proj -l INFO
    

    重启:

    $ celery  multi restart w1 -A proj -l INFO
    

    停止:

    # 停止,但不会等待工作程序关闭
    $ celery multi stop w1 -A proj -l INFO
    
    # 停止,并确保在退出之前已完成所有当前正在执行的任务
    $ celery multi stopwait w1 -A proj -l INFO
    

    详情见 daemonization tutorial.


    二、调用 Celery 任务

    可以使用delay()函数调用任务:

    >>> from proj.tasks import add
    >>> add.delay(2, 2)
    

    delay()方法实际上是apply_async()方法的快捷方式:

    >>> add.apply_async((2, 2))
    

    使用apply_async()方法可以指定选项,如运行时间,应发送到的队列等:

    >>> add.apply_async((2, 2), queue='lopri', countdown=10)
    # 任务将被发送到名为的队列中lopri,并且任务将最早在消息发送后10秒钟执行
    

    若直接应用任务将在当前进程中执行任务,不会发送任何消息:

    >>> add(2, 2)
    4
    

    使用上述三种方法组成了 Celery 任务调用的API接口, Calling User Guide 中有更详尽的描述。


    三、查看任务状态

    每个任务调用都将被赋予一个唯一的标识符(UUID)即任务ID:

    >>> res.id
    d6b3aea2-fb9b-4ebc-8da4-848818db9114
    

    result.get()出错时默认情况下会抛出异常,传递propagate参数可以不抛出异常,而是返回一个异常对象

    >>> res.get(propagate=False)
    TypeError("unsupported operand type(s) for +: 'int' and 'str'")
    

    查看任务执行结果:

    >>> res.failed()
    True
    
    >>> res.successful()
    False
    
    >>> res.state
    'FAILURE'
    
    • 关于查看任务状态的详细设置: States
    • 关于执行任务的详细设置: Calling Guide

    四、设计任务工作流

    4.1 函数签名

    是有我们可能希望将一个 “任务调用” 传递给另一个进程,或者作为另一个函数的参数,于是Celery为此使用了一种称为signature的函数。它包装单个“ 任务调用” 的参数和执行选项,这个 signature 可以传递给函数,甚至可以序列化并通过网络发送。

    创建一个任务签名:

    >>> add.signature((2, 2), countdown=10)
    tasks.add(2, 2)
    
    # 快捷方式:
    >>> add.s(2, 2)
    tasks.add(2, 2)
    

    一个签名也是一个任务,也可以调用delayapply_async方法执行,区别在于签名可能已经指定了参数签名,完整的签名可以直接执行:

    >>> s1 = add.s(2, 2)
    >>> res = s1.delay()
    >>> res.get()
    4
    

    也可以创建不完整的签名,在在调用签名时补全其他参数(重复的参数会被新参数替代):

    >>> s2 = add.s(2)
    >>> res = s2.delay(8)
    >>> res.get()
    10
    

    所以,创建一个函数签名到底用来干啥呢?下面要说的 canvas 原语会用到。


    4.2 原语

    所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断。我们执行异步任务时,也可能会遇到这样的业务场景,即一组任务要么全部成功,要么全部失败。 canvas 原语就是用来定义这一组任务的执行,通过多种方式组合它们以构成复杂的工作流程。

    canvas 原语包括以下六种:


    Groups

    一个 group 会并行调用任务列表,返回一个特殊的结果实例,该实例让我们可以将结果作为一个组进行检查,并按顺序检索返回值。

    >>> from celery import group
    >>> from proj.tasks import add
    
    >>> group(add.s(i, i) for i in range(10))().get()
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    

    使用不完整签名函数的group:

    >>> g = group(add.s(i) for i in range(10))
    >>> g(10).get()
    [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    

    Chains

    链式调用,前者的结果作为后者的一个输入:

    >>> from celery import chain
    >>> from proj.tasks import add, mul
    
    # (4 + 4) * 8
    >>> chain(add.s(4, 4) | mul.s(8))().get()
    64
    

    partial chain:

    >>> # (? + 4) * 8
    >>> g = chain(add.s(4) | mul.s(8))
    >>> g(4).get()
    64
    

    可以直接省略chain关键字:

    >>> (add.s(4, 4) | mul.s(8))().get()
    64
    

    Chords

    chords 用于调用一个 group 的结果:

    >>> from celery import chord
    >>> from proj.tasks import add, xsum
    
    >>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
    90
    

    一个连接到 group 的 chain 会自动转化为一个chord:

    >>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
    90
    

    由于原语都是用签名函数,所以可以随意组合,如:

    >>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
    

    有关工作流程的更多信息:Canvas


    五、路由

    Celery支持AMQP协议提供的所有路由功能,也支持简单的路由规则,即:将消息发送的指定的队列。

    task_routes参数可以按名称路由任务,并将所有内容集中在一个位置:

    app.conf.update(
        task_routes = {
            'proj.tasks.add': {'queue': 'hipri'},
        },
    )
    

    然后,可以让 worker进程 在指定的队列中拿取任务:

    $ celery -A proj worker -Q hipri
    

    还可以为 worker进程 指定多个队列,例如让 worker进程 从 hipri队列 和 默认队列 中拿取任务(由于历史原因,celery队列就是默认队列):

    $ celery -A proj worker -Q hipri,celery
    

    队列的顺序无关紧要,因为 worker进程 将给予队列同等的权重,要了解有关路由的更多信息,包括充分利用AMQP路由功能,参阅Routing Guide


    六、远程监控

    如果使用RabbitMQ,Redis或Qpid作为代理,则可以在运行时监控 worker进程。

    例如,可以查看worker进程当前正在执行的任务:

    $ celery -A proj inspect active
    

    这是通过使用广播消息传递实现的,因此群集中的每个 worker 都将接收所有远程控制命令。如果未提供目的地,那么每个 worker 都会响应并回复请求,使用--destination选项指定一个或多个 worker 对请求执行操作,这是 worker 主机名的逗号分隔列表:

    $ celery -A proj inspect active --destination=celery@example.com
    

    其他更多监控命令,参考Monitoring Guide


    七、时区

    Celery内部和消息默认使用UTC时区,当 worker 收到一条消息(例如设置了倒计时)时,它将该UTC时间转换为本地时间。如果希望使用与系统时区不同的时区,则必须使用以下timezone设置进行配置:

    app.conf.timezone = 'Asia/Shanghai'
    

    八、进一步优化

    默认配置未针对吞吐量进行优化,默认情况下,它尝试在许多短期任务和较少的长期任务之间折衷,即吞吐量和公平调度间的折衷。

    如果有严格的公平调度要求,或者要针对吞吐量进行优化,参阅Optimizing Guide

    如果使用RabbitMQ,则可以安装librabbitmq模块,这是用C实现的AMQP客户端:

    $ pip install librabbitmq
    
  • 相关阅读:
    Interview with BOA
    Java Main Differences between HashMap HashTable and ConcurrentHashMap
    Java Main Differences between Java and C++
    LeetCode 33. Search in Rotated Sorted Array
    LeetCode 154. Find Minimum in Rotated Sorted Array II
    LeetCode 153. Find Minimum in Rotated Sorted Array
    LeetCode 75. Sort Colors
    LeetCode 31. Next Permutation
    LeetCode 60. Permutation Sequence
    LeetCode 216. Combination Sum III
  • 原文地址:https://www.cnblogs.com/yangblood/p/14508936.html
Copyright © 2011-2022 走看看