zoukankan      html  css  js  c++  java
  • Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor

    Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor - Worker 模式

    Spring Cloud 系列目录 - Eureka 篇

    Eureka 消息广播主要分三部分讲解:

    1. 服务器列表管理:PeerEurekaNodes 管理了所有的 PeerEurekaNode 节点。
    2. 消息广播机制分析:PeerAwareInstanceRegistryImpl 收到客户端的消息后,第一步:先更新本地注册信息;第二步:遍历所有的 PeerEurekaNode,转发给其它节点。
    3. TaskDispacher 消息处理: Acceptor - Worker 模式分析。

    首先回顾一下消息广播的流程,在上一篇 Eureka 系列(05)消息广播(上):消息广播原理分析 中对 Eureka 消息广播的源码进行了分析,PeerAwareInstanceRegistryImpl 将消息广播任务委托给 PeerEurekaNode。PeerEurekaNode 内部采用 TaskDispacher 的 Acceptor - Worker 模式进行异步处理。本文则重点分析这种异步处理机制。

    1. Acceptor - Worker 模式原理

    图1:Acceptor - Worker 模式原理
    sequenceDiagram participant TaskDispatcher participant AcceptorExecutor participant WorkerRunnable note left of TaskDispatcher : 接收消息广播任务<br/>process TaskDispatcher ->> AcceptorExecutor : process WorkerRunnable ->> AcceptorExecutor : requestWorkItem WorkerRunnable ->> AcceptorExecutor : requestWorkItems note right of WorkerRunnable : run opt error WorkerRunnable -->> AcceptorExecutor : reprocess end

    总结: TaskDispatcher 接收消息广播任务,实际由 AcceptorExecutor 线程处理,之后由 WorkerRunnable 线程执行。WorkerRunnable 线程执行逻辑已经分析过了,下面看一下 AcceptorExecutor 的源码。

    AcceptorExecutor 主要方法:

    • process/reprocess 接收消息广播任务,存放到 acceptorQueue/reprocessQueue 队列中。
    • requestWorkItem/requestWorkItems 获取要执行的消息广播任务 singleItemWorkQueue/batchWorkQueue

    2. AcceptorExecutor

    图2:AcceptorRunner 任务处理
    graph LR A(Client) B(reprocessQueue) C(acceptorQueue) D(pendingTasks<br/>processingOrder) E(batchWorkQueue) F(singleItemWorkQueue) A -- reprocess --> B A -- process --> C B -- drainReprocessQueue --> D C -- drainAcceptorQueue --> D D -- assignBatchWork --> E D -- assignSingleItemWork --> F

    总结: AcceptorRunner 线程每 10s 轮询一次,消息广播任务从 acceptorQueue -> pendingTasks -> batchWorkQueue,WorkerRunner 执行线程直接获取 batchWorkQueue 任务执行。

    如果 pendingTasks 任务超载(默认10000)丢弃的原则是:一是丢弃最老的任务和重试的任务,执行最新的任务。二是同taskId的任务只执行最新的任务

    • pendingTasks 队列满后,reprocessQueue 任务会全部丢弃,acceptorQueue 则丢弃最老的任务执行最新的任务。
    • AcceptorExecutor 的初始化是在 PeerEurekaNode 方法中。默认 pendingTasks 的最大任务数为 maxBufferSize=10000个,一次批处理的最大数为 maxBatchingSize=200个,批处理的最大延迟时间为 maxBatchingDelay=500ms。

    2.1 属性

    AcceptorExecutor 内部有多个队列,维护任务的执行,队列的功能如下:

    private final int maxBufferSize;		// pendingTasks队列最大值,默认值 10000
    private final int maxBatchingSize;		// 一次批处理的最大任务数,默认值 250
    private final long maxBatchingDelay;	// 任务的最大延迟时间,默认值 500ms
    
    // 1. 接收消息广播的任务
    private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue;
    private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue;
    
    // 2. 默认每 10s 轮询一次,将接收的消息处理一次
    //    AcceptorRunner 单线程处理,所以是普通队列
    private final Map<ID, TaskHolder<ID, T>> pendingTasks;
    private final Deque<ID> processingOrder;
    
    // 3. 即将要处理的消息广播任务
    private final Semaphore singleItemWorkRequests;
    private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue;
    
    private final Semaphore batchWorkRequests;
    private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue;
    

    2.2 源码分析

    2.2.1 AcceptorRunner 总体流程

    class AcceptorRunner implements Runnable {
        @Override
        public void run() {
            long scheduleTime = 0;
            while (!isShutdown.get()) {
                try {
                    // 1. 将任务从 reprocessQueue/acceptorQueue -> pendingTasks
                    drainInputQueues();
                    int totalItems = processingOrder.size();
                    long now = System.currentTimeMillis();
                    // 2. trafficShaper 流量整行,执行失败后的延迟时间
                    //    congestionRetryDelayMs 100ms 执行一次
                    //    networkFailureRetryMs 1000ms 执行一次
                    if (scheduleTime < now) {
                        scheduleTime = now + trafficShaper.transmissionDelay();
                    }
                    // 3. pendingTasks -> batchWorkQueue
                    if (scheduleTime <= now) {
                        assignBatchWork();
                        assignSingleItemWork();
                    }
                    // 4. 没有可执行的任务了,等待 10s
                    if (totalItems == processingOrder.size()) {
                        Thread.sleep(10);
                    }
                } catch (InterruptedException ex) {
                } catch (Throwable e) {
                }
            }
        }
    }
    

    总结: AcceptorRunner 线程每 10s 轮询一次,消息广播任务从从 acceptorQueue -> pendingTasks -> batchWorkQueue,WorkerRunner 执行线程直接获取 batchWorkQueue 任务执行。

    • drainInputQueues 任务从 acceptorQueue -> pendingTasks
    • assignBatchWork/assignSingleItemWork 任务从 pendingTasks -> batchWorkQueue

    2.2.2 drainInputQueues

    drainInputQueues 方法处理 reprocessQueue/acceptorQueue 队列中的任务到 pendingTasks,任务只有到 pendingTasks 中才会被处理,否则就丢弃。

    // 先处理 reprocessQueue,再处理 acceptorQueue。
    // 默认 acceptorQueue 会覆盖 reprocessQueue 中的任务,也就是最新的任务会覆盖重试的任务
    private void drainInputQueues() throws InterruptedException {
        do {
            drainReprocessQueue();	// reprocessQueue
            drainAcceptorQueue();	// acceptorQueue
    
            if (!isShutdown.get()) {
                if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                    TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                    if (taskHolder != null) {
                        appendTaskHolder(taskHolder);
                    }
                }
            }
        } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
    }
    

    总结: drainInputQueues 方法将接收的消息广播任务从 reprocessQueue/acceptorQueue -> pendingTasks。如果 pendingTasks 任务大多执行的原则是: 丢弃最老的任务和重试的任务,执行最新的任务。

    1. drainReprocessQueue 处理 reprocessQueue 队列,也就是通过 repocess 接收的任务。当 pendingTasks 队列中的任务超出阈值,重试的任务直接丢弃。
    2. drainAcceptorQueue 处理 acceptorQueue 队列,也就是通过 process 接收的任务。pendingTasks 中最老的任务直接丢弃,将新的任务添加到队列中。
    private void drainReprocessQueue() {
        long now = System.currentTimeMillis();
        // 1. 只要 pendingTasks 没有超过阈值,maxBufferSize=10000
        //    就将重试的任务添加到 pendingTasks 中 
        while (!reprocessQueue.isEmpty() && !isFull()) {
            TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
            ID id = taskHolder.getId();
            if (taskHolder.getExpiryTime() <= now) {
                expiredTasks++;
            } else if (pendingTasks.containsKey(id)) {
                overriddenTasks++;
            } else {
                pendingTasks.put(id, taskHolder);
                processingOrder.addFirst(id);
            }
        }
        // 2. reprocessQueue 队列中剩余的任务全部丢弃。
        if (isFull()) {
            queueOverflows += reprocessQueue.size();
            reprocessQueue.clear();
        }
    }
    

    drainReprocessQueue 方法当任务过多时直接丢弃了重试的任务,drainAcceptorQueue 则不同,丢弃最老的任务,执行最新的任务。目的是保证新任务肯定能执行,而旧的任务根据实际情况丢弃。

    private void drainAcceptorQueue() {
        while (!acceptorQueue.isEmpty()) {
            appendTaskHolder(acceptorQueue.poll());
        }
    }
    private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
        // 1. 如果任务超出阈值,丢弃最老的任务
        if (isFull()) {
            pendingTasks.remove(processingOrder.poll());
            queueOverflows++;
        }
        // 2. 将最新的任务添加到队列中
        TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
        if (previousTask == null) {
            processingOrder.add(taskHolder.getId());
        } else {
            overriddenTasks++;
        }
    }
    

    总结: 还是那句话,如果 pendingTasks 超出阈值时执行的原则是:

    1. 丢弃最老的任务和重试的任务,执行最新的任务。

    2. 同taskId的任务只执行最新的任务

      appendTaskHolder 会覆盖同名的 taskId 任务,taskId 的生成是在 PeerEurekaNode 接收消息广播任务时生成的,生成的原则是:requestType(任务类型)+ appName(应用名称)+ id(实例id)。任务类型包括:register、cancel、heartbeat、statusUpdate、deleteStatusOverride。

      // PeerEurekaNode.process 是会生成 taskId  
      private static String taskId(String requestType, InstanceInfo info) {
          return taskId(requestType, info.getAppName(), info.getId());
      }
      private static String taskId(String requestType, String appName, String id) {
          return requestType + '#' + appName + '/' + id;
      }
      

    2.2.3 assignBatchWork

    将任务从 pendingTasks 从移动到 batchWorkQueue 中,requestWorkItem 直接获取 batchWorkQueue 进行处理。

    // pendingTasks -> batchWorkQueue
    void assignBatchWork() {
        // 1. pendingTasks 为空则 false,pendingTasks 队列满了肯定为 true。
        //    任务的延迟时间不超过 maxBatchingDelay=500ms
        if (hasEnoughTasksForNextBatch()) {
            if (batchWorkRequests.tryAcquire(1)) {
                long now = System.currentTimeMillis();
                // 2. 批处理任务最大为 maxBatchingSize=250
                int len = Math.min(maxBatchingSize, processingOrder.size());
                List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                while (holders.size() < len && !processingOrder.isEmpty()) {
                    ID id = processingOrder.poll();
                    TaskHolder<ID, T> holder = pendingTasks.remove(id);
                    // 3. 任务过期,直接丢弃
                    if (holder.getExpiryTime() > now) {
                        holders.add(holder);
                    } else {
                        expiredTasks++;
                    }
                }
                // 4. 添加到 batchWorkQueue 队列中
                if (holders.isEmpty()) {
                    batchWorkRequests.release();
                } else {
                    batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                    batchWorkQueue.add(holders);
                }
            }
        }
    }
    

    总结: assignBatchWork 执行的条件:一是 pendingTasks 任务超载了,立即执行;二是任务延迟时间大于 maxBatchingDelay=500ms。目的就是为了控制任务的执行频率:任务太多或延迟时间过长立即执行。

    private boolean hasEnoughTasksForNextBatch() {
        if (processingOrder.isEmpty()) {
            return false;
        }
        // 队列中任务大多立即执行 maxBufferSize=10000
        if (pendingTasks.size() >= maxBufferSize) {
            return true;
        }
    	// 任务延迟时间太长立即执行 maxBatchingDelay=500ms
        TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
        long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
        return delay >= maxBatchingDelay;
    }
    

    3. 问题总结

    3.1 消费速度控制

    (1)服务器忙或网络异常

    当出现服务器忙或网络IO异常时就需要等待一段时间再发送请求。默认情况下:

    • congestionRetryDelayMs:服务器忙时至少 100ms
    • networkFailureRetryMs:网络IO异常时至少 1000ms
    public void run() {
        ...
        if (scheduleTime < now) {
            scheduleTime = now + trafficShaper.transmissionDelay();
        }
        if (scheduleTime <= now) {
            assignBatchWork();
            assignSingleItemWork();
        }
    }
    

    总结: 如果出现服务器忙(503)或网络 IO 异常时,至少要等待一定的时间,再次发送请求。TrafficShaper 是流量整行的意思,即控制请求发送的频率。

    long transmissionDelay() {
        // 没有任务异常,不能等待
        if (lastCongestionError == -1 && lastNetworkFailure == -1) {
            return 0;
        }
    	// 出现对方服务器忙,至少 congestionRetryDelayMs=100ms
        long now = System.currentTimeMillis();
        if (lastCongestionError != -1) {
            long congestionDelay = now - lastCongestionError;
            if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
                return congestionRetryDelayMs - congestionDelay;
            }
            lastCongestionError = -1;
        }
    
    	// 出现网IO异常,至少 networkFailureRetryMs=1000ms
        if (lastNetworkFailure != -1) {
            long failureDelay = now - lastNetworkFailure;
            if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
                return networkFailureRetryMs - failureDelay;
            }
            lastNetworkFailure = -1;
        }
        return 0;
    }
    

    (2)最大任务数限制

    如果消息广播任务超出阈值,丢弃的原则是:一是丢弃最老的任务和重试的任务,执行最新的任务。二是同taskId的任务只执行最新的任务。这在 2.2.2小节有详细的说明,默认 maxBufferSize=10000。

    (3)批处理任务延迟时间

    在 PeerEurekaNode 接收广播任务,生成 TaskHolder 时,会生成任务的提交时间,如果任务延迟赶时间超过 maxBatchingDelay 则立即执行。这个时间是在 hasEnoughTasksForNextBatch 方法中进行控制的。默认 maxBatchingDelay=500ms。

    3.2 Semaphore batchWorkRequests 作用分析

    在任务的处理过程中都会使用 Semaphore 这个信号锁,它的作用是什么呢?

    private final Semaphore batchWorkRequests = new Semaphore(0);
    private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
    
    // AcceptorRunner 分配任务
    void assignBatchWork() {
        if (hasEnoughTasksForNextBatch()) {
            if (batchWorkRequests.tryAcquire(1)) {
                List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
               	...
                if (holders.isEmpty()) {
                    // 如果没有分配任务,下一次可继续分配任务
                    batchWorkRequests.release();
                } else {
                    // 如果已经分配任务,则必须等到消费才消费才开始重新分配任务
                    // 如果任务一直没有被消费,则 AcceptorRunner 轮询时会丢弃老的任务
                    batchWorkQueue.add(holders);
                }
            }
        }
    }
    // WorkerRunable 获取任务
    BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
        batchWorkRequests.release();
        return batchWorkQueue;
    }
    

    总结: AcceptorRunner 线程轮询时进行任务分配,如果没有获取 Semaphore 锁,也就是说任务一直没有被消费,当 pendingTasks 任务过多,会按照丢弃老的执行新的任务原则进行处理。如果有 WorkerRunable 线程进行消费则会释放锁,重新进行任务分配。


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    线性表单链表的实现
    线性表顺序存储结构
    【C语言】产生随机数
    TCP/IP协议
    【bfs】奇怪的电梯(P1135)
    【DFS】取数游戏(P1123)
    【DFS】圣诞夜的极光(P1454)
    【贪心】骑士的工作(P2695)
    【贪心】纪念品分组(P1094)
    生活的那么一点反思
  • 原文地址:https://www.cnblogs.com/binarylei/p/11617710.html
Copyright © 2011-2022 走看看