zoukankan      html  css  js  c++  java
  • Celery 源码解析八:State 和 Result

    序列文章:

    在前面几篇解析中,我们已经看过了 Worker 是如何运行的,Task 是如何创建的,以及怎么被路由到 Worker 中,除了这些之外,我们还对流量限制,Worker 控制和 Task/Worker 产生和处理 Event 进行了介绍。但这却不是全部,今天我将继续和大家一起来看看 Celery 的 Task Result 和 State 相关的内容。

    Task State

    celery/celery/states.py 中,可以看到 Task 的所有 State

    至于如果想要看 Task 的状态转变,那我们就需要回顾一下 Task 从产生到完成的过程,如果是一个普通的 Task 的话,那么它的执行流程应该是:

    从 Producer 产生 -①-> MQ -②-> Consumer 接收 -③-> 执行策略执行 -④-> 执行完成

    那么这里的 ①②③④ 有没有设置状态,并且状态是啥,就是我们关注的重点了,ok,那我们就跟随之前的脚步快速得发掘一番:

    这个是同步发送消息的时候,其实也是第一次出现 state 的时候,但是这个 state 已经是 ④ 之后的状态了,但是,无妨,我们已经知道了执行完成之后的状态是在这里 update 的。那么对于异步的情况又是如何更新状态的呢,那根据我们之前的 review,我们知道应该去找 AsyncResult

    所以这里就是 backend 的事情啦,backend 再甩锅给 backend Consumer,然后就是在 backend Consumer 里面干活了,最后就到了这:

    这里其实虽然是调用的 on_state_change 但是,我们应该知道的一点就是在这个时候其实状态已经变化了,那么我们就应该追述到上面的 result._iter_meta,看看里面的实现,这里我就飘过很多细节了,直接看最实际的地方:

    这里可以发现,其实结果是通过 Backend 组件完成的,例如 Redis 就放在 Key 对应的 Value 中,其他的 Backend 也类似,如果没有的话,在 Line 667 我们就知道了,状态就是 PENDING 啦,第二个状态捕获!然后还有 Line 668 这里的解析结果也是值得我们一看,我直接看最后吧:

    可以发现这里也是很有趣的,如果 Task 的状态是异常状态的话,还需要进行异常回溯,其实就是把异常信息解压出来,展示给我们看。

    那我们是时候我们去找找第三个状态:Received 了,我们可以猜测一下大概可能出现的位置,这里就是:

    这里可以发现居然是有一个 EVENT 发送出去,然后我们就应该去看看处理这个 Event 的人啦,如果你还有印象的话,那么你应该可以很容易得追踪到:

    这里你会发现你感兴趣的所有东西,哈哈,其实你需要看看一个字典:

    你会发现所有的状态都可以在这里更新,哈哈。所以下面我就列一下各个 EVENT 的发出地点就好了:

    如果你够细心的话,你会发现 failedsucceeded 都是有两处发送地点,有兴趣猜测一下原因么?

    Task Result

    其实在前面说 State 的过程中我们已经看到了很多和 Result 有关的东西了。Result 可以认为就两种,同步和异步的,其实也可以看成一种;但是,从另一个维度看,Result 其实也可看成成功的和失败的,anyway,这些都是在 Result 中保存了的。

    我在这里就简单回顾看看 第三篇 中看过的内容,其实 Result 中就放了这几样东西:


    分别是 任务ID返回值执行状态 以及 错误堆栈信息,我们在调用的地方用这些信息就足够了。

    Worker State

    除了 Task 有状态之外,Worker 其实也是有状态的,回顾我们 第一篇 的内容,Worker 的启动过程这么冗长,所以不是说一运行就到了 Runing 状态了,中间肯定是有各种状态的,所以我们不妨一起来看看。

    回顾一下我们之前看的一些代码,我们应该是有讲过一个叫做 HeartBeat 的组件,它的作用就是保持发送心跳,告诉其他 Worker 这个 Worker 还活着,那么这是一种状态,除了这些状态之外,我们还可以找到的其他 Worker 状态有:

    • worker-offline
    • worker-online
    • worker-heartbeat

    Worker-heartbeat 我们很清楚它在哪里发送出去的了,那么 Worker-onlineWorker-offline 又是在哪里发出去的呢?其实都一样的,我们看到 Heart 这个 Bootstep

    着重看看 startstop 方法,我们会发现其实调用的都是同一个地方:

    一切了然于胸!

  • 相关阅读:
    外设驱动库开发笔记5:AD7705系列ADC驱动
    ROS+LEDE最强上网软路由
    Flume1.9.0的安装、部署、简单应用(含分布式、与Hadoop3.1.2、Hbase1.4.9的案例)
    通过 Sqoop1.4.7 将 Mysql5.7、Hive2.3.4、Hbase1.4.9 之间的数据导入导出
    Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建
    Centos7 二进制安装 Kubernetes 1.13
    Centos7 使用 kubeadm 安装Kubernetes 1.13.3
    go get获取gitlab私有仓库的代码
    Nginx设置Https反向代理,指向Docker Gitlab11.3.9 Https服务
    Docker 创建 Bamboo6.7.1 以及与 Crowd3.3.2 实现 SSO 单点登录
  • 原文地址:https://www.cnblogs.com/makor/p/implement-of-state-and-result-in-celery.html
Copyright © 2011-2022 走看看