zoukankan      html  css  js  c++  java
  • Kafka Consumer2

    本文记录了和conumser相关的几个类。
    首先是RequestFuture这个类,consumer和服务端通信使用它作为返回值。
    其次是HeartBeat机制,consumer和coordinator通过它来获取对方的状态,并进行相应的处理。
    然后是SubscriptionState,consumer消费topic的信息都保存在这个类中。
    最后是Fetcher, 它负责从broker中拉取数据。

    RequestFuture

    RequestFuture 是ConsumerNetworkClient发出请求的异步返回值。当请求结果返回后,会对结果进行分析,并且遍历listeners处理请求结果。

    这样子,就会有三个变量:

    1. Object 对象 INCOMPLETE_SENTINEL 表示 请求结果没有完成。
    2. AtomicReference
    3. ConcurrentLinkedQueue<RequestFutureListener> listeners 表示一个监听者列表,对返回结果进行处理。
      1. RequestFutureListener 里面有 onSuccess 和 onFailure 方法分别用于处理成功和失败的请求。

    RequestFuture有以下几个方法:

    1. isDone:future已经完成了,可以被后续handler处理。
    2. succeeded: future已经完成了,并且结果没有错误。
    3. failed: 查看返回结果有没有出错。
    4. isRetriable: 如果返回结果出错了,查看这个request是不是可以重试。
    5. complete: 查看结果是是不是正确的,如果是调用listeners的 onSuccess方法进行处理。
    6. fireSuccess: 调用listeners的 onSuccess方法进行处理
    7. fireFailure: 调用listeners的 onFail方法进行处理
    8. addListener: 添加监听者
    9. compose: 将一种类型的RequestFuture 转化为另外一种类型
    10. chain 将一个 RequestFuture 转化为RequestFutureListener,并添加到监听者队列。

    RequestFuture的使用方法:

    
         RequestFuture<ClientResponse> future = client.send(api, request);
          client.poll(future);
     
          if (future.succeeded()) {
              ClientResponse response = future.value();
              // Handle response
          } else {
              throw future.exception();
          }
    

    当返回了RequestFuture ,因为逻辑要求,要转化为另外一种RequestFuture。
    RequestFutureAdapter<F, T> 就是用来处理这种需求的。

    
    public abstract class RequestFutureAdapter<F, T> {
    
        public abstract void onSuccess(F value, RequestFuture<T> future);
    
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            future.raise(e);
        }
    }
    

    HeartBeat

    HeartBeat 主要有两个类,一个是HeartBeat类,用来管理heartBeat,一个是HeartbeatThread,用来和服务器的coordinator进行通信。

    HeartBeat

    作为一个心跳管理类,HeartBeat保存了下面几个变量:

    //和coordinator通信的最长时间,如果超过sessionTimeout,就认为coordinator挂了
        private final long sessionTimeout;
    // 心跳间隔, 正常来说每次的时间间隔都是这个值
        private final long heartbeatInterval;
    // consumer发送心跳的最长间隔,如果超过这个间隔,就认为consumer脱离了消费组
        private final long maxPollInterval;
        // 返回失败后,重试等待的时间
        private final long retryBackoffMs;
        // 上次发送的时间
        private volatile long lastHeartbeatSend; // volatile since it is read by metrics
        // 上次接收到返回的时间
        private long lastHeartbeatReceive;
        // 上次重置session的时间,重新加入消费组以及重启thread都会重置为当前时间。
        //它会和lastHeartbeatSend做比较,那个时间比较晚,就使用哪个作为上次发送哦的时间
        private long lastSessionReset;
        // consumer每次调用发送心跳的时间,如果now - lastPoll > maxPollInterval 就认为consumer maybeLeaveGroup
        private long lastPoll;
        // 心跳返回错误,就设置为true,这时候就将发送的时间间隔设置为retryBackoffMs
        private boolean heartbeatFailed;
    

    HeartBeat 最重要的作用就是计算当前到下次要发送heartbeat的时间间隔。

        public long timeToNextHeartbeat(long now) {
            long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
            final long delayToNextHeartbeat;
            if (heartbeatFailed)
                delayToNextHeartbeat = retryBackoffMs;
            else
                delayToNextHeartbeat = heartbeatInterval;
    
            if (timeSinceLastHeartbeat > delayToNextHeartbeat)
                return 0;
            else
                return delayToNextHeartbeat - timeSinceLastHeartbeat;
        }
    

    HeartBeatThread

    HeartBeatThread 的主要逻辑就是等待下次要发送的时间,发送一次心跳,并查看返回值。

    它对AbstractCoordinator.this进行了同步, 然后判断当前满足发送心跳的条件后,通过sendHeartbeatRequest发送心跳请求。心跳包含下面的内容:

    private final String groupId;
    private final int groupGenerationId;
    private final String memberId;
    

    SubscriptionState

    这个类用来保存消费者消费的topic, partition, offset的信息。
    consumer通过这个类subscribe topic。

    这个类里面最重要的变量应该就是:

       private final PartitionStates<TopicPartitionState> assignment;
    

    assignment 里面保存了这个consumer分配到的TopicPartition,以及这个parition当前的消费状态。
    PartitionStates 类分装一个LinkedHashMap,它保存了<TopicPartition, TopicPartitionState>键值对。

    TopicPartition里面包含了

    private final int partition;
    private final String topic;
    

    TopicPartitionState 包含了

    
    private Long position; // last consumed position
    private OffsetAndMetadata committed;  // last committed position
    private boolean paused;  // whether this partition has been paused by the user
    private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
    

    通过上面这两个变量就知道了当前consumer的操作状态,所有的函数基本都是围绕着assignment 进行操作。

    Fetcher

    fetcher负责从broker中拉取数据,并保存在一个队列中。consumer 在poll 的时候,首先会从这个队列中拿一部分数据进行处理。如果队列中没有数据了,fetcher 会再次拉取数据。

    fetcher会通过sendFetches拉取数据, 并将结果保存在
    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;中。 然后 consumer在poll 的时候,就会调用fetchedRecords从completedFetches中拉取数据。

    了解了上面这些信息,再看pollOnce的逻辑,就明白多了:

        private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            coordinator.poll(time.milliseconds());
    
            // fetch positions if we have partitions we're subscribed to that we
            // don't know the offset for
            //更新offset
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions());
    
            // if data is available already, return it immediately
            // 如果 completedFetches 队列中有数据,就直接拿数据
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
            if (!records.isEmpty())
                return records;
    
            // send any new fetches (won't resend pending fetches)
            // 准备好发送请求
            fetcher.sendFetches();
            
            long now = time.milliseconds();
            long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
            // 发送请求到服务端,但是如果之前发送的fectch请求还在路上的话,就block等待。
            client.poll(pollTimeout, now, new PollCondition() {
                @Override
                public boolean shouldBlock() {
                    // since a fetch might be completed by the background thread, we need this poll condition
                    // to ensure that we do not block unnecessarily in poll()
                    return !fetcher.hasCompletedFetches();
                }
            });
    
            // after the long poll, we should check whether the group needs to rebalance
            // prior to returning data so that the group can stabilize faster
            if (coordinator.needRejoin())
                return Collections.emptyMap();
            // 再次返回数据
            return fetcher.fetchedRecords();
        }
    
    
  • 相关阅读:
    Struts2+Spring3+Mybatis3开发环境搭建
    spring+struts2+mybatis
    【LeetCode】Populating Next Right Pointers in Each Node
    【LeetCode】Remove Duplicates from Sorted Array
    【LeetCode】Remove Duplicates from Sorted Array II
    【LeetCode】Binary Tree Inorder Traversal
    【LeetCode】Merge Two Sorted Lists
    【LeetCode】Reverse Integer
    【LeetCode】Same Tree
    【LeetCode】Maximum Depth of Binary Tree
  • 原文地址:https://www.cnblogs.com/SpeakSoftlyLove/p/7251616.html
Copyright © 2011-2022 走看看