zoukankan      html  css  js  c++  java
  • [源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat

    [源码分析] 并行分布式任务队列 Celery 之 Timer & Heartbeat

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    之前我们用了十几篇文章,介绍了 Kombu 和 Celery 的基础功能。从本文开始,我们介绍 Celery 的一些辅助功能(比如负载均衡,容错等等)。其实从某种意义上来说,这些辅助功能更加重要。

    本文我们介绍 Timer 和 Heart 这两个组件。大家可以看看底层设计是如何影响上层实现的。

    [源码分析] 消息队列 Kombu 之 mailbox

    [源码分析] 消息队列 Kombu 之 Hub

    [源码分析] 消息队列 Kombu 之 Consumer

    [源码分析] 消息队列 Kombu 之 Producer

    [源码分析] 消息队列 Kombu 之 启动过程

    [源码解析] 消息队列 Kombu 之 基本架构

    [源码解析] 并行分布式框架 Celery 之架构 (1)

    [源码解析] 并行分布式框架 Celery 之架构 (2)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (1)

    [源码解析] 并行分布式框架 Celery 之 worker 启动 (2)

    [源码解析] 分布式任务队列 Celery 之启动 Consumer

    [源码解析] 并行分布式任务队列 Celery 之 Task是什么

    [从源码学设计]celery 之 发送Task & AMQP

    [源码解析] 并行分布式任务队列 Celery 之 消费动态流程

    [源码解析] 并行分布式任务队列 Celery 之 多进程模型

    [源码分析] 分布式任务队列 Celery 多线程模型 之 子进程

    [源码分析]并行分布式任务队列 Celery 之 子进程处理消息

    0x01 Blueprint

    Celery 的 Worker初始化过程中,其内部各个子模块的执行顺序是由一个BluePrint类定义,并且根据各个模块之间的依赖进行排序(实际上把这种依赖关系组织成了一个 DAG)执行。

    Celery worker 的 Blueprint 如下,我们可以看到 Timer,Hub 是 Celery Worker 的两个基本组件,提到 hub 是因为后面讲解需要用到。

    class Blueprint(bootsteps.Blueprint):
        """Worker bootstep blueprint."""
    
        name = 'Worker'
        default_steps = {
            'celery.worker.components:Hub', # 这里是 Hub
            'celery.worker.components:Pool',
            'celery.worker.components:Beat',
            'celery.worker.components:Timer', # 这里是 Timer
            'celery.worker.components:StateDB',
            'celery.worker.components:Consumer',
            'celery.worker.autoscale:WorkerComponent',
        }
    

    0x02 Timer Step

    我们首先来到 Timer Step。

    从 Timer 组件 的定义中可以看到,Timer 组件 会根据当前worker是否使用事件循环机制来决定创建什么类型的timer

    • 如果使用 eventloop,则使用kombu.asynchronous.timer.Timer as _Timer,这里具体等待动作由用户自己完成
    • 否则使用 Pool 内部的Timer类(就是 timer_cls='celery.utils.timer2.Timer'),timer2 自己做了一个线程来做定时等待

    定义如下:

    from kombu.asynchronous.timer import Timer as _Timer
    
    class Timer(bootsteps.Step):
        """Timer bootstep."""
    
        def create(self, w):
            if w.use_eventloop:                        # 检查传入的Worker是否使用了use_eventloop
                # does not use dedicated timer thread.
                w.timer = _Timer(max_interval=10.0)    # 直接使用kombu的timer做定时器
            else:
                if not w.timer_cls:                     # 如果配置文件中没有配置timer_clas
                    # Default Timer is set by the pool, as for example, the
                    # eventlet pool needs a custom timer implementation.
                    w.timer_cls = w.pool_cls.Timer      # 使用缓冲池中的Timer
                w.timer = self.instantiate(w.timer_cls,
                                           max_interval=w.timer_precision,
                                           on_error=self.on_timer_error,
                                           on_tick=self.on_timer_tick)  # 导入对应的类并实例化
    

    起初看代码时候很奇怪,为什么要再单独定义一个 timer2?

    原因推断是(因为对 Celery 的版本发展历史不清楚,所以此处不甚确定,希望有同学可以指正):依据 底层 Transport 的设计来对 Timer 做具体实现调整

    2.1 Transport

    大家知道,Celery 是依赖于 Kombu,而在 Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。

    我们再回顾下具体 Kombu 的概念:

    • Connection 是 AMQP 对 连接的封装;
    • Channel 是 AMQP 对 MQ 操作的封装;

    那么两者的关系就是对 MQ 的操作(Channel)必然离不开连接(Connection),但是 Kombu 并不直接让 Channel 使用 Connection 来发送 / 接受请求,而是引入了一个新的抽象 Transport。Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行;

    Transport 代表真实的 MQ 连接,也是真正连接到 MQ( redis / rabbitmq )的实例。就是存储和发送消息的实体,用来区分底层消息队列是用 amqp、Redis 还是其它实现的。

    具体 Kombu 逻辑如下图,Transport 在左下角处 :

    在这里插入图片描述

    2.2 Thread-less VS Thread-based

    对于 Transport,某些 rate-limit implementation(比如 RabbitMQ / Redis ) 为了减少开销,采用了event-loop(底层使用了 Epoll),是 thread-less and lock-free

    而其他旧类型的 Transport 就是 Thread based,比如 Mongo。因此,

    • 对于 Thread-less Transport

      • Kombu 就采用了 kombu.asynchronous.timer.Timer as _Timer,具体等待操作是在 event-loop 中实现,就是 调用者 自己会做等待。

      • 具体比如在 Redis Transport 之中,就有 register_with_event_loop 函数用来在 loop(就是 event-loop)中注册自己,具体如下:

      • def register_with_event_loop(self, connection, loop):
            cycle = self.cycle
            cycle.on_poll_init(loop.poller)
            cycle_poll_start = cycle.on_poll_start
            add_reader = loop.add_reader
            on_readable = self.on_readable
        
            def on_poll_start():
                cycle_poll_start()
                [add_reader(fd, on_readable, fd) for fd in cycle.fds]
            loop.on_tick.add(on_poll_start)
            loop.call_repeatedly(10, cycle.maybe_restore_messages)
            loop.call_repeatedly(
                health_check_interval,
                cycle.maybe_check_subclient_health
            )
        
    • 对于 thread-based Transport,

      • 则采用了 celery.utils.timer2.Timer,timer2 自己继承了线程类,使用自己这个线程来做定时等待
      • 比如在 Mongodb transport 之中,就没有任何关于 event loop 的操作。

    即,选用 timer 的哪种实现,看是否需要等待来决定,就是谁来完成 “等待” 这个动作

    翻了翻 Celery 2.4.7 的代码,发现在这个版本,确实只有 Thread-based timer,其代码涵盖了 目前的 timer 2 和 kombu.asynchronous.timer.Timer 大部分功能。应该是从 3.0.2 之后,把部分代码分离到了 kombu.asynchronous.timer.Timer ,实现了 Thread-less 和 Thread-based 两个不同的实现。

    具体可以参见下面源码中的注释:

    - RabbitMQ/Redis: thread-less and lock-free rate-limit implementation.
    
        This means that rate limits pose minimal overhead when used with
        RabbitMQ/Redis or future transports using the event-loop,
        and that the rate-limit implementation is now thread-less and lock-free.
    
        The thread-based transports will still use the old implementation for
        now, but the plan is to use the timer also for other
        broker transports in Celery 3.1.
    

    0x03 Timer in Pool

    注意,上面的是 Timer Step,是一个启动的阶段,其目的是生成 Timer 组件 给 其他组件使用,并不是 Timer 功能类

    我们其次来看看 Timer 功能类 在 线程池 Pool 中的使用,就对应了前面 Blueprint step 之中的两种不同 cases。

    分别也对应了两种应用场景(或者说是线程池实现):

    • gevent 和 eventlet 使用 kombu.asynchronous.timer.Timer
    • BasePool(以及其他类型线程池)使用了 timer2.Timer。

    初步来分析,gevent 和 eventlet 都是用协程来模拟线程,所以本身具有Event loop,因此使用 kombu.asynchronous.timer.Timer 也算顺理成章。

    3.1 gevent 和 eventlet

    对于 gevent,eventlet 这种情况,使用了 class Timer(_timer.Timer) 作为 Timer 功能类。

    从代码中可以看到,class Timer 扩展了 kombu.asynchronous.timer.Timer

    from kombu.asynchronous import timer as _timer
    
    class Timer(_timer.Timer):
    
        def __init__(self, *args, **kwargs):
            from gevent import Greenlet, GreenletExit
    
            class _Greenlet(Greenlet):
                cancel = Greenlet.kill
    
            self._Greenlet = _Greenlet
            self._GreenletExit = GreenletExit
            super().__init__(*args, **kwargs)
            self._queue = set()
    
        def _enter(self, eta, priority, entry, **kwargs):
            secs = max(eta - monotonic(), 0)
            g = self._Greenlet.spawn_later(secs, entry)
            self._queue.add(g)
            g.link(self._entry_exit)
            g.entry = entry
            g.eta = eta
            g.priority = priority
            g.canceled = False
            return g
    
        def _entry_exit(self, g):
            try:
                g.kill()
            finally:
                self._queue.discard(g)
    
        def clear(self):
            queue = self._queue
            while queue:
                try:
                    queue.pop().kill()
                except KeyError:
                    pass
    
        @property
        def queue(self):
            return self._queue
    

    3.2 BasePool

    而 BasePool 采用了 timer2 . Timer 作为 Timer 功能类。

    from celery.utils import timer2
    
    class BasePool:
        """Task pool."""
    
        Timer = timer2.Timer
    

    下面我们具体看看 Timer 功能类 如何实现。

    0x04 kombu.Timer

    4.1 异步

    kombu.asynchronous.timer.Timer 实现了异步Timer。

    由其注释可以,kombu.asynchronous.timer.Timer 在调用者每次得到下一次entry时,会给出tuple of (wait_seconds, entry)调用者应该进行等待相应时间

    即,kombu.Timer是调用者等待,普通timer是timer自己启动线程等待

    """Iterate over schedule.
    This iterator yields a tuple of ``(wait_seconds, entry)``,
    where if entry is :const:`None` the caller should wait
    for ``wait_seconds`` until it polls the schedule again.
    """
    

    定义如下:

    class Timer:
        """Async timer implementation."""
    
        Entry = Entry
    
        on_error = None
    
        def __init__(self, max_interval=None, on_error=None, **kwargs):
            self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
            self.on_error = on_error or self.on_error
            self._queue = []
    

    4.2 调用

    4.2.1 添加 timer function

    用户通过 call_repeatedly 来添加 timer function。

    def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0):
        kwargs = {} if not kwargs else kwargs
        tref = self.Entry(fun, args, kwargs)
    
        @wraps(fun)
        def _reschedules(*args, **kwargs):
            last, now = tref._last_run, monotonic()
            lsince = (now - tref._last_run) if last else secs
            try:
                if lsince and lsince >= secs:
                    tref._last_run = now
                    return fun(*args, **kwargs) # 调用用户方法
            finally:
                if not tref.canceled:
                    last = tref._last_run
                    next = secs - (now - last) if last else secs
                    self.enter_after(next, tref, priority)
    
        tref.fun = _reschedules
        tref._last_run = None
        return self.enter_after(secs, tref, priority)
    

    4.2.2 调用

    Timer通过apply_entry进行调用。

    def apply_entry(self, entry):
        try:
            entry()
        except Exception as exc:
            if not self.handle_error(exc):
                logger.error('Error in timer: %r', exc, exc_info=True)
    

    在获取下一次entry时,会返回等待时间。

    def __iter__(self, min=min, nowfun=monotonic,
                 pop=heapq.heappop, push=heapq.heappush):
        """Iterate over schedule.
    
        This iterator yields a tuple of ``(wait_seconds, entry)``,
        where if entry is :const:`None` the caller should wait
        for ``wait_seconds`` until it polls the schedule again.
        """
        max_interval = self.max_interval
        queue = self._queue
    
        while 1:
            if queue:
                eventA = queue[0]
                now, eta = nowfun(), eventA[0]
    
                if now < eta:
                    yield min(eta - now, max_interval), None
                else:
                    eventB = pop(queue)
    
                    if eventB is eventA:
                        entry = eventA[2]
                        if not entry.canceled:
                            yield None, entry
                        continue
                    else:
                        push(queue, eventB)
            else:
                yield None, None
    

    4.3 实验

    我们做实验看看 timer 功能类 的 使用。

    4.3.1 示例代码

    下面代码来自https://github.com/liuliqiang/blog_codes/tree/master/python/celery/kombu,特此感谢。

    def main(arguments):
        hub = Hub()
        exchange = Exchange('asynt')
        queue = Queue('asynt', exchange, 'asynt')
    
        def send_message(conn):
            producer = Producer(conn)
            producer.publish('hello world', exchange=exchange, routing_key='asynt')
            print('message sent')
    
        def on_message(message):
            print('received: {0!r}'.format(message.body))
            message.ack()
            # hub.stop()  # <-- exit after one message
    
        conn = Connection('redis://localhost:6379')
        conn.register_with_event_loop(hub)
    
        def p_message():
            print('redis://localhost:6379')
    
        with Consumer(conn, [queue], on_message=on_message):
            send_message(conn)
            hub.timer.call_repeatedly(
                3, p_message
            )
            hub.run_forever()
    
    
    if __name__ == '__main__':
        sys.exit(main(sys.argv[1:]))
    

    这里,Hub 就是 timer 的客户

    得到Stack如下,可以看到 hub 使用 timer 做了消息循环,于是我们需要看看 hub:

    p_message
    _reschedules, timer.py:127
    __call__, timer.py:65
    fire_timers, hub.py:142
    create_loop, hub.py:300
    run_once, hub.py:193
    run_forever, hub.py:185
    main, testUb.py:46
    <module>, testUb.py:50
    

    启动时候的逻辑如下,hub 通过 hub.timer.call_repeatedly 设置了需要调用的用户函数 fun,在 Timer 内部,fun 被包装设置为 _reschedules。

        Hub
         +
         |                                         +----------------------------------+
         |                                         |  kombu.asynchronous.timer.Timer  |
         |                                         |                                  |
         |                call_repeatedly(fun)     |                                  |
         |                                         |                                  |
         +---------------------------------------------->  _reschedules [@wraps(fun)] |
         |                                         |                                  |
         |                                         |                                  |
         |                                         |                                  |
         |                                         +----------------------------------+
         |
         |
         v
    
    

    4.3.2 Hub 的使用

    以下代码是Hub类,在这里,Hub 就是 timer 的用户。

    可以看到,hub 建立了message_loop。在 loop 中,hub 会:

    • 使用 fire_timers 进行 timer 处理,会设置下一次 timer。
    • 得到 poll_timeout 后,会进行处理或者 sleep

    下面是简化版代码。

    def create_loop():
    
        while 1:
    
            poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
    
            if readers or writers:
    
                events = poll(poll_timeout)
    
                for fd, event in events or ():
    
                    if event & READ:
                        try:
                            cb, cbargs = readers[fd]
                        try:
                            cb(*cbargs)
                        except Empty:
                            pass
    
            else:
                # no sockets yet, startup is probably not done.
                sleep(min(poll_timeout, 0.1))
            yield
    

    我们再看看 fire_timers,这就是调用用户方法。

    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
                    propagate=()):
        timer = self.timer
        delay = None
        
        if timer and timer._queue:
            for i in range(max_timers):
                delay, entry = next(self.scheduler)
                if entry is None:
                    break
    
                entry()# 调用用户方法
                
        return min(delay or min_delay, max_delay)
    

    使用Entry调用用户方法

    class Entry:
        """Schedule Entry."""
    
        def __call__(self):
            return self.fun(*self.args, **self.kwargs)# 调用用户方法
    

    具体逻辑如下:

    +--------------------------+
    |                          |
    |              Hub         |
    |               +          |
    |               |          |                        +----------------------------------+
    |               |          |                        |  kombu.asynchronous.timer.Timer  |
    |               |          |                        |                                  |
    |               |          |  call_repeatedly(fun)  |                                  |
    |               |          |                        |                                  |
    |               +---------------------------------------->  _reschedules [@wraps(fun)] |
    |               |          |                        |                                  |
    |               |          |                        |                                  |
    |               |          |                        |                                  |
    |               |          |                        +----------------------------------+
    |            create_loop   |
    |               +          |                                   ^
    |               |          |                                   |
    |               |          |                                   |
    |               v          |                                   |
    |                          |                                   |
    |   +--->  message_loop    |                                   |
    |   |           +          |                                   |
    |   |           |          |                                   |
    |   |           v          |        iter(self.timer)           |
    |   |       fire_timers +--------------------------------------+
    |   |           +          |
    |   |           |          |
    |   |           v          |
    |   |         poll         |
    |   |           +          |
    |   |           |          |
    |   |           v          |
    |   |         sleep        |
    |   |           +          |
    |   |           |          |
    |   +-----------+          |
    +--------------------------+
    

    0x05 timer2

    celery/utils/timer2.py中定义了Timer类实例,可以看出其继承了threading.Thread,但是居然也用kombu.asynchronous.timer

    在源码注释中有:This is only used for transports not supporting AsyncIO

    其实,就是 timer2 自己做了一个线程来做定时sleep等待,然后调用用户方法而已

    from kombu.asynchronous.timer import Entry
    from kombu.asynchronous.timer import Timer as Schedule
    from kombu.asynchronous.timer import logger, to_timestamp
    
    class Timer(threading.Thread): # 扩展了 线程
        """Timer thread.
    
        Note:
            This is only used for transports not supporting AsyncIO.
        """
    
        Entry = Entry
        Schedule = Schedule
    
        running = False
        on_tick = None
    
        _timer_count = count(1)
    

    在run方法中,会定期sleep。

    def run(self):
        try:
            self.running = True
            self.scheduler = iter(self.schedule)
    
            while not self._is_shutdown.isSet():
                delay = self._next_entry()
                if delay:
                    if self.on_tick:
                        self.on_tick(delay)
                    if sleep is None:  # pragma: no cover
                        break
                    sleep(delay)
            try:
                self._is_stopped.set()
            except TypeError:  # pragma: no cover
                # we lost the race at interpreter shutdown,
                # so gc collected built-in modules.
                pass
        except Exception as exc:
            sys.stderr.flush()
            os._exit(1)
    

    在_next_entry方法中,调用用户方法,这是通过kombu.asynchronous.timer完成的。

    def _next_entry(self):
        with self.not_empty:
            delay, entry = next(self.scheduler)
            if entry is None:
                if delay is None:
                    self.not_empty.wait(1.0)
                return delay
        return self.schedule.apply_entry(entry)
    __next__ = next = _next_entry  # for 2to3
    

    0x06 Heart

    Timer 类主要是做一些定时调度方面的工作。

    Heart 组件 就是使用 Timer组件 进行定期调度,发送心跳 Event,告诉其他 Worker 这个 Worker 还活着。

    同时,当本worker 启动,停止时候,也发送 worker-online,worker-offline 这两种消息。

    6.1 Heart in Bootstep

    位置在:celery/worker/consumer/heart.py。

    其作用就是启动 heart 功能类。

    class Heart(bootsteps.StartStopStep):
        """Bootstep sending event heartbeats.
    
        This service sends a ``worker-heartbeat`` message every n seconds.
    
        Note:
            Not to be confused with AMQP protocol level heartbeats.
        """
    
        requires = (Events,)
    
        def __init__(self, c,
                     without_heartbeat=False, heartbeat_interval=None, **kwargs):
            self.enabled = not without_heartbeat
            self.heartbeat_interval = heartbeat_interval
            c.heart = None
            super().__init__(c, **kwargs)
    
        def start(self, c):
            c.heart = heartbeat.Heart(
                c.timer, c.event_dispatcher, self.heartbeat_interval,
            )
            c.heart.start()
    
        def stop(self, c):
            c.heart = c.heart and c.heart.stop()
        shutdown = stop
    

    6.2 Heart in Consumer

    位置在:celery/worker/heartbeat.py。可以看到就是从启动之后,使用 call_repeatedly 定期发送心跳

    class Heart:
        """Timer sending heartbeats at regular intervals.
    
        Arguments:
            timer (kombu.asynchronous.timer.Timer): Timer to use.
            eventer (celery.events.EventDispatcher): Event dispatcher
                to use.
            interval (float): Time in seconds between sending
                heartbeats.  Default is 2 seconds.
        """
    
        def __init__(self, timer, eventer, interval=None):
            self.timer = timer
            self.eventer = eventer
    
        def _send(self, event, retry=True):
            return self.eventer.send(event, freq=self.interval, ...)
    
        def start(self):
            if self.eventer.enabled:
                self.tref = self.timer.call_repeatedly(
                    self.interval, self._send, ('worker-heartbeat',),
                )
    

    此时变量为:

    self = {Heart} <celery.worker.heartbeat.Heart object at 0x000001D377636408>
     eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
     interval = {float} 2.0
     timer = {Timer: 0} <Timer(Timer-1, stopped daemon)>
     tref = {NoneType} None
      _send_sent_signal = {NoneType} None
    

    6.3 worker-online

    当启动时候,发送 worker-online 消息。

        def start(self):
            if self.eventer.enabled:
                self._send('worker-online')
                self.tref = self.timer.call_repeatedly(
                    self.interval, self._send, ('worker-heartbeat',),
                )
    

    6.4 worker-offline

    当停止时候,发送 worker-offline 消息。

        def stop(self):
            if self.tref is not None:
                self.timer.cancel(self.tref)
                self.tref = None
            if self.eventer.enabled:
                self._send('worker-offline', retry=False)
    

    6.5 发送心跳

    Heart组件会调用 eventer 来群发心跳:

    • eventer 是 celery.events.dispatcher.EventDispatcher;
    • 心跳是 'worker-heartbeat' 这个 Event;

    所以我们下文就要分析 celery.events.dispatcher.EventDispatcher。

        def _send(self, event, retry=True):
            if self._send_sent_signal is not None:
                self._send_sent_signal(sender=self)
            return self.eventer.send(event, freq=self.interval,
                                     active=len(active_requests),
                                     processed=all_total_count[0],
                                     loadavg=load_average(),
                                     retry=retry,
                                     **SOFTWARE_INFO)
    

    0xEE 个人信息

    ★★★★★★关于生活和技术的思考★★★★★★

    微信公众账号:罗西的思考

    如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

    在这里插入图片描述

    0xFF 参考

    8: State 和 Result

    6: Events 的实现

  • 相关阅读:
    visual studio 2010设置
    Win7 x64 PL/SQL 连接 Oralce 提示 Could not initialize "%ORACLE_HOME%\bin\oci.dll"
    jxl导入/导出excel
    struts2的action与jsp之间传递参数
    web服务器、容器和中间件
    myeclipse trial expired 注册码解决办法(可用于8.5)
    Java中的内部类
    JS的trim()方法
    struts2 <s:property>用法
    EL表达式的使用
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14696580.html
Copyright © 2011-2022 走看看