zoukankan      html  css  js  c++  java
  • Celery + Redis 的探究

    Celery + Redis 的探究

    文本尝试研究,使用 redis 作为 celery 的 broker 时,celery 的交互操作同 redis 中数据记录的关联关系。

    不在乎过程的,可以直接看最后的结论

    测试代码:

    # a.py
    from celery import Celery
    
    celery_app = Celery('a', broker='redis://localhost:6379/0')
    
    @celery_app.task
    def test_task(n):
        open('test.txt', 'a').write(n + '
    ')
        print n
    
    if __name__ == '__main__':
        test_task.delay('==== ttttt1 =====')

    先将 redis 部署于本机的 6379 默认端口 不要设置密码,使用 celery 版本 3.1.23


    [1]
    先直接发起一个 task

    python a.py
    

    执行后可看到 redis 上生成了两个 key

     

    celery:表示当前正在队列中的 task,等待被 worker 所接收
    _kombu.binding.celery:这个不用管(celery 使用 kombu 维护消息队列,这个是 kombu 生成的对逻辑影响不大)

    然后启动一个 worker

    celery worker -A a --loglevel=debug
    执行后可看到 celery 这个 key 消失了,同时新增了 2 个 key
     

    celery 消失说明任务已经被刚启动的 worker 接收了,worker 会自己去执行这个 task,当前没有等待被接收的任务
    _kombu.binding.celery.pidbox:这个也不用管(也是 kombu 维护的)
    _kombu.binding.celeryev:这个也不用管(也是 kombu 维护的,用来记下当前连接的 worker)


    [2]
    下面我们试一下延时任务
    将代码中的

    test_task.delay('==== ttttt1 =====')
    

    改成

    test_task.apply_async(('==== ttttt2 =====', ), countdown=60)
    

    然后启动脚本,发起一个60秒后执行任务,并且启动 celery worker 准备执行任务

    python a.py
    celery worker -A a --loglevel=debug
    

    在 60 秒内查看 redis,可以看到没有出现 celery 这个 key,但多出了另外两个 key

     

    unacked:可以理解为这个是被 worker 接收了但是还没开始执行的 task 列表(因为60秒后才会开始执行)
    unacked_index:用户标记上面 unacked 的任务的 id,理论上应该与 unacked 一一对应的

    60 秒后再次查看 redis,可以看到又回到了无任务的状态

     

    这表示被 worker 领取的任务确实在 60 秒后执行了


    [3]
    这里在尝试一种异常的情况,worker 领取任务后还没到 60 秒,突然遇到问题退出了

    python a.py
    celery worker -A a --loglevel=debug

    等大约 10 秒后,ctrl+c 中断 worker
    可以看到 redis 中有 celery 这个 key,其中有一条等待领取的任务

    image.png

    再次启动 worker

    celery worker -A a --loglevel=debug
    

    可以发现任务被再次正常领取和执行


    结论,由此可以推测出 celery 和 redis 之间交互的基本原理:

    1、当发起一个 task 时,会向 redis 的 celery key 中插入一条记录。
    2、如果这时有正在待命的空闲 worker,这个 task 会立即被 worker 领取。
    3、如果这时没有空闲的 worker,这个 task 的记录会保留在 celery key 中。
    4、这时会将这个 task 的记录从 key celery 中移除,并添加相关信息到 unackedunacked_index 中。
    5、worker 根据 task 设定的期望执行时间执行任务,如果接到的不是延时任务或者已经超过了期望时间,则立刻执行。
    6、worker 开始执行任务时,通知 redis。(如果设置了 CELERY_ACKS_LATE = True 那么会在任务执行结束时再通知)
    7、redis 接到通知后,将 unackedunacked_index 中相关记录移除。
    8、如果在接到通知前,worker 中断了,这时 redis 中的 unacked 和 unacked_index 记录会重新回到 celery key 中。(这个回写的操作是由 worker 在 “临死” 前自己完成的,所以在关闭 worker 时为防止任务丢失,请务必使用正确的方法停止它,如: celery multi stop w1 -A proj1)
    9、在 celery key 中的 task 可以再次重复上述 2 以下的流程。
    10、celery 只是利用 redis 的 list 类型,当作个简单的 Queue,并没有使用消息订阅等功能


    题外话:

    1、启动 celery worker 时可以加上 -B 参数使得 schedule 定时任务生效,但要注意如果为同一个项目启动多个 worker 时,只需要其中一个启动命令中加上 -B,否则 schedule 会被多次执行。
    2、上面的 1 同时也说明了 schedule task 的执行是由 celery 发起的。也就是说,如果在 django 中使用了 CELERYBEAT_SCHEDULE,那么只要 celery worker -B 启动了,即使 django web 服务没有启动,定时任务也一样会被发起。(推荐使用专门的 celery beat 方法)
    3、使用 flower 时,在上述的 “worker 领取任务后突然遇到问题退出了然后又重新启动执行” 这种情况下可能会出现显示不正常的问题,这个是否是 flower 的 bug 还是有其他原因,可能下篇再探究。

  • 相关阅读:
    mabatis配置文件yml配置打印sql
    java使用validator检验bean
    vue项目 老是报错 气的我就不行
    注入为空
    软件测试基础
    单元测试实战
    软件测试基础
    For循环案例---九九乘法表
    软件测试基础
    软件测试基础
  • 原文地址:https://www.cnblogs.com/wangkun122/p/11158291.html
Copyright © 2011-2022 走看看