zoukankan      html  css  js  c++  java
  • Celery + Redis 的探究

    Celery + Redis 的探究

    文本尝试研究,使用 redis 作为 celery 的 broker 时,celery 的交互操作同 redis 中数据记录的关联关系。

    不在乎过程的,可以直接看最后的结论

    测试代码:

    # a.py
    from celery import Celery
    
    celery_app = Celery('a', broker='redis://localhost:6379/0')
    
    @celery_app.task
    def test_task(n):
        open('test.txt', 'a').write(n + '
    ')
        print n
    
    if __name__ == '__main__':
        test_task.delay('==== ttttt1 =====')

    先将 redis 部署于本机的 6379 默认端口 不要设置密码,使用 celery 版本 3.1.23


    [1]
    先直接发起一个 task

    python a.py
    

    执行后可看到 redis 上生成了两个 key

     

    celery:表示当前正在队列中的 task,等待被 worker 所接收
    _kombu.binding.celery:这个不用管(celery 使用 kombu 维护消息队列,这个是 kombu 生成的对逻辑影响不大)

    然后启动一个 worker

    celery worker -A a --loglevel=debug
    执行后可看到 celery 这个 key 消失了,同时新增了 2 个 key
     

    celery 消失说明任务已经被刚启动的 worker 接收了,worker 会自己去执行这个 task,当前没有等待被接收的任务
    _kombu.binding.celery.pidbox:这个也不用管(也是 kombu 维护的)
    _kombu.binding.celeryev:这个也不用管(也是 kombu 维护的,用来记下当前连接的 worker)


    [2]
    下面我们试一下延时任务
    将代码中的

    test_task.delay('==== ttttt1 =====')
    

    改成

    test_task.apply_async(('==== ttttt2 =====', ), countdown=60)
    

    然后启动脚本,发起一个60秒后执行任务,并且启动 celery worker 准备执行任务

    python a.py
    celery worker -A a --loglevel=debug
    

    在 60 秒内查看 redis,可以看到没有出现 celery 这个 key,但多出了另外两个 key

     

    unacked:可以理解为这个是被 worker 接收了但是还没开始执行的 task 列表(因为60秒后才会开始执行)
    unacked_index:用户标记上面 unacked 的任务的 id,理论上应该与 unacked 一一对应的

    60 秒后再次查看 redis,可以看到又回到了无任务的状态

     

    这表示被 worker 领取的任务确实在 60 秒后执行了


    [3]
    这里在尝试一种异常的情况,worker 领取任务后还没到 60 秒,突然遇到问题退出了

    python a.py
    celery worker -A a --loglevel=debug

    等大约 10 秒后,ctrl+c 中断 worker
    可以看到 redis 中有 celery 这个 key,其中有一条等待领取的任务

    image.png

    再次启动 worker

    celery worker -A a --loglevel=debug
    

    可以发现任务被再次正常领取和执行


    结论,由此可以推测出 celery 和 redis 之间交互的基本原理:

    1、当发起一个 task 时,会向 redis 的 celery key 中插入一条记录。
    2、如果这时有正在待命的空闲 worker,这个 task 会立即被 worker 领取。
    3、如果这时没有空闲的 worker,这个 task 的记录会保留在 celery key 中。
    4、这时会将这个 task 的记录从 key celery 中移除,并添加相关信息到 unackedunacked_index 中。
    5、worker 根据 task 设定的期望执行时间执行任务,如果接到的不是延时任务或者已经超过了期望时间,则立刻执行。
    6、worker 开始执行任务时,通知 redis。(如果设置了 CELERY_ACKS_LATE = True 那么会在任务执行结束时再通知)
    7、redis 接到通知后,将 unackedunacked_index 中相关记录移除。
    8、如果在接到通知前,worker 中断了,这时 redis 中的 unacked 和 unacked_index 记录会重新回到 celery key 中。(这个回写的操作是由 worker 在 “临死” 前自己完成的,所以在关闭 worker 时为防止任务丢失,请务必使用正确的方法停止它,如: celery multi stop w1 -A proj1)
    9、在 celery key 中的 task 可以再次重复上述 2 以下的流程。
    10、celery 只是利用 redis 的 list 类型,当作个简单的 Queue,并没有使用消息订阅等功能


    题外话:

    1、启动 celery worker 时可以加上 -B 参数使得 schedule 定时任务生效,但要注意如果为同一个项目启动多个 worker 时,只需要其中一个启动命令中加上 -B,否则 schedule 会被多次执行。
    2、上面的 1 同时也说明了 schedule task 的执行是由 celery 发起的。也就是说,如果在 django 中使用了 CELERYBEAT_SCHEDULE,那么只要 celery worker -B 启动了,即使 django web 服务没有启动,定时任务也一样会被发起。(推荐使用专门的 celery beat 方法)
    3、使用 flower 时,在上述的 “worker 领取任务后突然遇到问题退出了然后又重新启动执行” 这种情况下可能会出现显示不正常的问题,这个是否是 flower 的 bug 还是有其他原因,可能下篇再探究。

  • 相关阅读:
    Begin Example with Override Encoded SOAP XML Serialization
    State Machine Terminology
    How to: Specify an Alternate Element Name for an XML Stream
    How to: Publish Metadata for a WCF Service.(What is the Metadata Exchange Endpoint purpose.)
    Beginning Guide With Controlling XML Serialization Using Attributes(XmlSerializaiton of Array)
    Workflow 4.0 Hosting Extensions
    What can we do in the CacheMetaData Method of Activity
    How and Why to use the System.servicemodel.MessageParameterAttribute in WCF
    How to: Begin Sample with Serialization and Deserialization an Object
    A Test WCF Service without anything of config.
  • 原文地址:https://www.cnblogs.com/wangkun122/p/11158291.html
Copyright © 2011-2022 走看看