zoukankan      html  css  js  c++  java
  • [源码解析] 并行分布式任务队列 Celery 之 负载均衡

    [源码解析] 并行分布式任务队列 Celery 之 负载均衡

    0x00 摘要

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文介绍 Celery 的负载均衡机制。

    Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。

    0x01 负载均衡

    Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。

    • 在 worker 决定 与 哪几个 queue 交互,有一个负载均衡(对于 queues );
    • 在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡(决定哪一个 worker 来处理任务);
    • 在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡(决定 worker 内部哪几个进程)。

    注意,这个顺序是从 worker 读取任务处理任务的角度 出发,而不是从系统架构角度出发。

    因为从系统架构角度说,应该是 which worker ----> which queue in the worker ----> which subprocess in the worker 这个角度。

    我们下面按照 "worker 读取任务处理任务角度" 的顺序进行分析。

    1.1 哪几个 queue

    Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。

    • Kombu 是循环调用,每次调用会制定读取哪些内部queues的消息;
    • queue 这个逻辑概念,其实就是对应了 redis 中的一个 物理key,从 queue 读取,就代表 BRPOP 需要指定 监听的 key。
    • Kombu 是在每一次监听时候,根据这些 queues 得到 其在 redis 之中对应的物理keys,即都指定监听哪些 redis keys;
    • brpop是个多key命令,当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。这样就得到了这些 逻辑queue 对应的消息。

    因为 task 可能会 用到多个 queue,所以具体从哪几个queue 读取?这时候就用到了策略

    1.1.1 _brpop_start 选择下次读取的queue

    Kombu 在每次监听时候,调用 _brpop_start 完成监听。其作用就是 选择下一次读取的queues

    _brpop_start 如下:

    def _brpop_start(self, timeout=1):
        # 得到一些内部queues
        queues = self._queue_cycle.consume(len(self.active_queues))
        if not queues:
            return
        # 得到queue对应的keys  
        keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
                for queue in queues] + [timeout or 0]
        self._in_poll = self.client.connection
        self.client.connection.send_command('BRPOP', *keys) # 利用这些keys,从redis内部获取key的消息
    

    此时变量如下:

    self.active_queues = {set: 1} {'celery'}
    
    len(self.active_queues) = {int} 1
      
    self._queue_cycle = {round_robin_cycle} <kombu.utils.scheduling.round_robin_cycle object at 0x0000015A7EE9DE88>
      
    self = {Channel} <kombu.transport.redis.Channel object at 0x0000015A7EE31048>
    

    所以_brpop_start 就是从 self._queue_cycle 获得几个需要读取的queue。

    具体如下图:

                                                                  +
                                                      Kombu       |          Redis
                                                                  |
                                                                  |
    +--------------------------------------------+                |
    |  Worker                                    |                |
    |                                            |                |           queue 1 key
    |   +-----------+                            |                |
    |   | queue 1   |                            |   BRPOP(keys)  |
    |   | queue 2   |                    keys    |                |
    |   | ......    | +--------+---------------------------------------->     queue 2 key
    |   | queue n   |          ^                 |                |
    |   +-----------+          | keys            |                |
    |                          |                 |                |
    |                          |                 |                |           queue 3 key
    |            +-------------+------------+    |                |
    |            |         Keys list        |    |                |
    |            |                          |    |                |
    |            +--------------------------+    |                |
    +--------------------------------------------+                |
                                                                  |
                                                                  |
                                                                  |
                                                                  |
                                                                  |
                                                                  +
    
    

    1.1.2 round_robin_cycle 设置下次读取的 queue

    从上面代码中,我们可以知道 consume 就是返回 round_robin_cycle 中前几个 queue,即 return self.items[:n]。

    而 self.items 的维护,是通过 rotate 完成的,就是把 最近用的 那个 queue 放到队列最后,这样给其他 queue 机会,就是 round robin 的概念了。

    class round_robin_cycle:
        """Iterator that cycles between items in round-robin."""
    
        def __init__(self, it=None):
            self.items = it if it is not None else []
    
        def update(self, it):
            """Update items from iterable."""
            self.items[:] = it
    
        def consume(self, n):
            """Consume n items."""
            return self.items[:n]
    
        def rotate(self, last_used):
            """Move most recently used item to end of list."""
            items = self.items
            try:
                items.append(items.pop(items.index(last_used)))
            except ValueError:
                pass
            return last_used
    
    

    比如在如下代码中,当读取到消息之后,就会调用 self._queue_cycle.rotate(dest) 进行调整。

        def _brpop_read(self, **options):
            try:
                try:
                    dest__item = self.client.parse_response(self.client.connection,
                                                            'BRPOP',
                                                            **options)
                except self.connection_errors:
                    # if there's a ConnectionError, disconnect so the next
                    # iteration will reconnect automatically.
                    self.client.connection.disconnect()
                    raise
                if dest__item:
                    dest, item = dest__item
                    dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                    self._queue_cycle.rotate(dest) # 这里进行调整
                    self.connection._deliver(loads(bytes_to_str(item)), dest)
                    return True
                else:
                    raise Empty()
            finally:
                self._in_poll = None
    
    

    具体如下图:

                                                                  +
                                                      Kombu       |          Redis
                                                                  |
                                                                  |
    +--------------------------------------------+                |
    |  Worker                                    |                |
    |                                            |                |           queue 1 key
    |   +-----------+                            |                |
    |   | queue 1   |                            |   BRPOP(keys)  |
    |   | queue 2   |                    keys    |                |
    |   | ......    | +--------+---------------------------------------->     queue 2 key
    |   | queue n   |          ^                 |                |
    |   +-----------+          | keys            |                |
    |                          |                 |                |
    |                          +                 |                |           queue 3 key
    |                 round_robin_cycle          |                |
    |                          +                 |                |
    |                          |                 |                |
    |                          |                 |                |
    |            +-------------+------------+    |                |
    |            |         Keys list        |    |                |
    |            +--------------------------+    |                |
    +--------------------------------------------+                |
                                                                  |
                                                                  +
    

    1.2 哪一个worker

    如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制因为redis 的单进程处理,所以只能有一个 worker 才能读到

    这本身就是一个负载均衡。这个和 spring quartz 的负载均衡实现非常类似。

    • spring quartz 是 多个节点读取 同一个数据库记录决定谁能开始下一次处理,哪一个得到了数据库锁 就是哪个。
    • Kombu 是通过 多个 worker 读取 redis "同一个或者一组key" 的 实际结果 来决定 "哪一个 worker 能开始下一次处理"。

    具体如下图:

                                                                +
                                                Kombu           |    Redis
                                                                |
                                                                |
    +--------------------------------------+                    |
    |  Worker 1                            |                    |
    |                                      |                    |
    |   +-----------+                      |                    |
    |   | queue 1   |                      |   BRPOP(keys)      |
    |   | queue 2   |              keys    |                    |
    |   | ......    | +--------+-----------------------------+  |
    |   | queue n   |          ^           |                 |  |
    |   +-----------+          |  keys     |                 |  |
    |                          |           |                 |  |
    |                          +           |                 |  |
    |                  round_robin_cycle   |                 |  |               +--> queue 1 key
    |                          ^           |                 |  |               |
    |                          |           |                 |  |               |
    |                          |           |                 |  | Single Thread |
    |             +------------+---------+ |                 +---------------------> queue 2 key
    |             |     keys list        | |                 |  |               |
    |             +----------------------+ |                 |  |               |
    +--------------------------------------+                 |  |               |
                                                             |  |               +--> queue 3 key
                                                             |  |
    +--------------------------------------+                 |  |
    | Worker 2                             |   BRPOP(keys)   |  |
    |                                      | +---------------+  |
    |                                      |                 |  |
    +--------------------------------------+                 |  |
                                                             |  |
    +--------------------------------------+   BRPOP(keys)   |  |
    | Worker 3                             |                 |  |
    |                                      |  +--------------+  |
    |                                      |                    +
    |                                      |
    +--------------------------------------+
    
    

    1.3 哪一个进程

    进程池中,使用了策略来决定具体使用哪一个进程来处理任务。

    1.3.1 策略

    先讲解 strategy。在 AsynPool 启动有如下,配置了策略:

    class AsynPool(_pool.Pool):
        """AsyncIO Pool (no threads)."""
    
        def __init__(self, processes=None, synack=False,
                     sched_strategy=None, proc_alive_timeout=None,
                     *args, **kwargs):
            self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
                                                       sched_strategy)
    

    于是我们看看 strategy 定义如下,基本由名字可以知道其策略意义:

    SCHED_STRATEGY_FCFS = 1 # 先来先服务
    SCHED_STRATEGY_FAIR = 4 # 公平
    
    SCHED_STRATEGIES = {
        None: SCHED_STRATEGY_FAIR,
        'default': SCHED_STRATEGY_FAIR,
        'fast': SCHED_STRATEGY_FCFS,
        'fcfs': SCHED_STRATEGY_FCFS,
        'fair': SCHED_STRATEGY_FAIR,
    }
    

    1.3.2 公平调度

    我们讲讲公平调度的概念。

    不同系统对于公平调度的理解大同小异,我们举几个例子看看。

    • Linux 中,调度器必须在各个进程之间尽可能公平地共享CPU时间,而同时又要考虑不同的任务优先级。一般原理是:按所需分配的计算能力,向系统中每个进程提供最大的公正性,或者从另外一个角度上说, 试图确保没有进程被亏待。
    • Hadoop 中,公平调度是一种赋予作业(job)资源的方法,它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。当单独一个作业在运行时,它将使用整个集群。当有其它作业被提交上来时,系统会将任务(task)空闲时间片(slot)赋给这些新的作业,以使得每一个作业都大概获取到等量的CPU时间。
    • Yarn 之中,Fair Share指的都是Yarn根据每个队列的权重、最大,最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。

    1.3.3 公平调度 in Celery

    在 asynpool之中,有设置,看看"是否为 fair 调度":

    is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR         
    

    基于 is_fair_strategy 这个变量,Celery 的公平调度有几处体现。

    在开始 poll 时候,如果是 fair,则需要 存在 idle worker 才调度,这样就给了 idler worker 一个调度机会

    def on_poll_start():
        # Determine which io descriptors are not busy
        inactive = diff(active_writes)
    
        # Determine hub_add vs hub_remove strategy conditional
        if is_fair_strategy:
            # outbound buffer present and idle workers exist
            add_cond = outbound and len(busy_workers) < len(all_inqueues)
        else:  # default is add when data exists in outbound buffer
            add_cond = outbound
    
        if add_cond:  # calling hub_add vs hub_remove
            iterate_file_descriptors_safely(
                inactive, all_inqueues, hub_add,
                None, WRITE | ERR, consolidate=True)
        else:
            iterate_file_descriptors_safely(
                inactive, all_inqueues, hub_remove)
    

    在具体发布 写操作 时候,也会看看是否 worker 已经正在忙于执行某一个 task,如果正在执行,就不调度,这样就给了其他 不忙worker 一个调度的机会

            def schedule_writes(ready_fds, total_write_count=None):
                if not total_write_count:
                    total_write_count = [0]
                # Schedule write operation to ready file descriptor.
                # The file descriptor is writable, but that does not
                # mean the process is currently reading from the socket.
                # The socket is buffered so writable simply means that
                # the buffer can accept at least 1 byte of data.
    
                # This means we have to cycle between the ready fds.
                # the first version used shuffle, but this version
                # using `total_writes % ready_fds` is about 30% faster
                # with many processes, and also leans more towards fairness
                # in write stats when used with many processes
                # [XXX On macOS, this may vary depending
                # on event loop implementation (i.e, select/poll vs epoll), so
                # have to test further]
                num_ready = len(ready_fds)
    
                for _ in range(num_ready):
                    ready_fd = ready_fds[total_write_count[0] % num_ready]
                    total_write_count[0] += 1
                    if ready_fd in active_writes:
                        # already writing to this fd
                        continue 
                    if is_fair_strategy and ready_fd in busy_workers: # 是否调度
                        # worker is already busy with another task
                        continue
                    if ready_fd not in all_inqueues:
                        hub_remove(ready_fd)
                        continue
    

    具体逻辑如下:

                                                              +
                                                Kombu         |    Redis
                                                              |
                                             BRPOP(keys)      |
    +------------------------------------+                    |
    |  Worker 1                          | +---------------+  |
    |                                    |                 |  |
    +------------------------------------+                 |  |                     queue 1 key
                                                           |  |                 +->
                                                           |  |                 |
    +------------------------------------+     BRPOP(keys) |  |  Single thread  |
    | Worker 2                           | +--------------------------------------> queue 2 key
    |                                    |                 |  |  (which worker) |
    +------------------------------------+                 |  |                 |
                                                           |  |                 |
    +------------------------------------+                 |  |                 +-> queue 3 key
    | Worker 3                           |                 |  |
    |                                    |                 |  |
    |     +-----------+                  |                 |  |
    |     | queue 1   |                  |     BRPOP(keys) |  |
    |     | queue 2   |          keys    |                 |  |
    |     | ......    | +--------+-------------------------+  |
    |     | queue n   |          ^       |                    |
    |     +-----------+          | keys  |                    |
    |                            |       |                    |
    |                            +       |                    |
    |                 round_robin_cycle (which queues)        |
    |                            ^       |                    |
    |                            |       |                    |
    |                            |       |                    |
    |                       +----+----+  |                    |
    |             +         |keys list|  |                    |
    |             |         +---------+  |                    |
    +------------------------------------+                    |
                  |                                           |
                  |  fair_strategy(which subprocess)          |
                  |                                           |
          +-------+----------+----------------+               |
          |                  |                |               |
          v                  v                v               |
    +-----+--------+  +------+-------+  +-----+--------+      |
    | subprocess 1 |  | subprocess 2 |  | subprocess 3 |      +
    +--------------+  +--------------+  +--------------+
    
    

    0x02 Autoscaler

    Autoscaler 的作用 实际就是在线调节进程池大小。这也和缓解负载相关,所以放在这里一起论述。

    2.1 调用时机

    在 WorkerComponent 中可以看到,为 AutoScaler 注册了两个调用途径:

    • 注册在 consumer 消息响应方法中,这样消费时候如果有需要,就会调整;
    • 利用 Hub 的 call_repeatedly 方法注册了周期任务,即周期看看是否需要调整。

    这样就会最大程度的加大调用频率。

    class WorkerComponent(bootsteps.StartStopStep):
        """Bootstep that starts the autoscaler thread/timer in the worker."""
    
        def create(self, w):
            scaler = w.autoscaler = self.instantiate(
                w.autoscaler_cls,
                w.pool, w.max_concurrency, w.min_concurrency,
                worker=w, mutex=DummyLock() if w.use_eventloop else None,
            )
            return scaler if not w.use_eventloop else None
    
        def register_with_event_loop(self, w, hub):
            w.consumer.on_task_message.add(w.autoscaler.maybe_scale) # 消费时候如果有需要,就会调整
            
            hub.call_repeatedly( # 周期看看是否需要调整
                w.autoscaler.keepalive, w.autoscaler.maybe_scale,
            )
    

    2.2 具体实现

    2.2.1 bgThread

    Autoscaler 是Background thread,这样 AutoScaler就可以在后台运行:

    class bgThread(threading.Thread):
        """Background service thread."""
    
        def run(self):
            body = self.body
            shutdown_set = self._is_shutdown.is_set
            try:
                while not shutdown_set():
                	body()
            finally:
                self._set_stopped()
    

    2.2.2 定义

    Autoscaler 的定义如下,可以看到其逻辑就是定期判断是否需要调整:

    • 如果当前并发已经到了最大,则下调;
    • 如果到了最小并发,则上调;
    • 则具体上调下调的,都是通过具体线程池函数做到的,这就是要根据具体操作系统来进行分析,此处略过。
    class Autoscaler(bgThread):
        """Background thread to autoscale pool workers."""
    
        def __init__(self, pool, max_concurrency,
                     min_concurrency=0, worker=None,
                     keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
            super().__init__()
            self.pool = pool
            self.mutex = mutex or threading.Lock()
            self.max_concurrency = max_concurrency
            self.min_concurrency = min_concurrency
            self.keepalive = keepalive
            self._last_scale_up = None
            self.worker = worker
    
        def body(self):
            with self.mutex:
                self.maybe_scale()
            sleep(1.0)
    
        def _maybe_scale(self, req=None):
            procs = self.processes
            cur = min(self.qty, self.max_concurrency)
            if cur > procs:
                self.scale_up(cur - procs)
                return True
            cur = max(self.qty, self.min_concurrency)
            if cur < procs:
                self.scale_down(procs - cur)
                return True
    
        def maybe_scale(self, req=None):
            if self._maybe_scale(req):
                self.pool.maintain_pool()
    
        def update(self, max=None, min=None):
            with self.mutex:
                if max is not None:
                    if max < self.processes:
                        self._shrink(self.processes - max)
                    self._update_consumer_prefetch_count(max)
                    self.max_concurrency = max
                if min is not None:
                    if min > self.processes:
                        self._grow(min - self.processes)
                    self.min_concurrency = min
                return self.max_concurrency, self.min_concurrency
    
        def scale_up(self, n):
            self._last_scale_up = monotonic()
            return self._grow(n)
    
        def scale_down(self, n):
            if self._last_scale_up and (
                    monotonic() - self._last_scale_up > self.keepalive):
                return self._shrink(n)
    
        def _grow(self, n):
            self.pool.grow(n)
    
        def _shrink(self, n):
    		self.pool.shrink(n)
    
        def _update_consumer_prefetch_count(self, new_max):
            diff = new_max - self.max_concurrency
            if diff:
                self.worker.consumer._update_prefetch_count(
                    diff
                )
    
        @property
        def qty(self):
            return len(state.reserved_requests)
    
        @property
        def processes(self):
            return self.pool.num_processes
    

    0xEE 个人信息

    ★★★★★★关于生活和技术的思考★★★★★★

    微信公众账号:罗西的思考

    如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

    在这里插入图片描述

    0xFF 参考

    Hadoop公平调度器指南

    浅析Linux中完全公平调度——CFS

    yarn公平调度详细分析(一)

  • 相关阅读:
    svg文件使用highmap显示
    动静分离
    angular 零碎
    使用doxmate生成文档
    javascript之console篇
    java 中String与StringBuilder 效率
    highcharts 组合chart
    js 攻坚克难
    html base 又一重大发现
    sql 分析 依赖beanutils
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14747918.html
Copyright © 2011-2022 走看看