zoukankan      html  css  js  c++  java
  • Kafka Consumer2

    本文记录了和conumser相关的几个类。
    首先是RequestFuture这个类,consumer和服务端通信使用它作为返回值。
    其次是HeartBeat机制,consumer和coordinator通过它来获取对方的状态,并进行相应的处理。
    然后是SubscriptionState,consumer消费topic的信息都保存在这个类中。
    最后是Fetcher, 它负责从broker中拉取数据。

    RequestFuture

    RequestFuture 是ConsumerNetworkClient发出请求的异步返回值。当请求结果返回后,会对结果进行分析,并且遍历listeners处理请求结果。

    这样子,就会有三个变量:

    1. Object 对象 INCOMPLETE_SENTINEL 表示 请求结果没有完成。
    2. AtomicReference
    3. ConcurrentLinkedQueue<RequestFutureListener> listeners 表示一个监听者列表,对返回结果进行处理。
      1. RequestFutureListener 里面有 onSuccess 和 onFailure 方法分别用于处理成功和失败的请求。

    RequestFuture有以下几个方法:

    1. isDone:future已经完成了,可以被后续handler处理。
    2. succeeded: future已经完成了,并且结果没有错误。
    3. failed: 查看返回结果有没有出错。
    4. isRetriable: 如果返回结果出错了,查看这个request是不是可以重试。
    5. complete: 查看结果是是不是正确的,如果是调用listeners的 onSuccess方法进行处理。
    6. fireSuccess: 调用listeners的 onSuccess方法进行处理
    7. fireFailure: 调用listeners的 onFail方法进行处理
    8. addListener: 添加监听者
    9. compose: 将一种类型的RequestFuture 转化为另外一种类型
    10. chain 将一个 RequestFuture 转化为RequestFutureListener,并添加到监听者队列。

    RequestFuture的使用方法:

    
         RequestFuture<ClientResponse> future = client.send(api, request);
          client.poll(future);
     
          if (future.succeeded()) {
              ClientResponse response = future.value();
              // Handle response
          } else {
              throw future.exception();
          }
    

    当返回了RequestFuture ,因为逻辑要求,要转化为另外一种RequestFuture。
    RequestFutureAdapter<F, T> 就是用来处理这种需求的。

    
    public abstract class RequestFutureAdapter<F, T> {
    
        public abstract void onSuccess(F value, RequestFuture<T> future);
    
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            future.raise(e);
        }
    }
    

    HeartBeat

    HeartBeat 主要有两个类,一个是HeartBeat类,用来管理heartBeat,一个是HeartbeatThread,用来和服务器的coordinator进行通信。

    HeartBeat

    作为一个心跳管理类,HeartBeat保存了下面几个变量:

    //和coordinator通信的最长时间,如果超过sessionTimeout,就认为coordinator挂了
        private final long sessionTimeout;
    // 心跳间隔, 正常来说每次的时间间隔都是这个值
        private final long heartbeatInterval;
    // consumer发送心跳的最长间隔,如果超过这个间隔,就认为consumer脱离了消费组
        private final long maxPollInterval;
        // 返回失败后,重试等待的时间
        private final long retryBackoffMs;
        // 上次发送的时间
        private volatile long lastHeartbeatSend; // volatile since it is read by metrics
        // 上次接收到返回的时间
        private long lastHeartbeatReceive;
        // 上次重置session的时间,重新加入消费组以及重启thread都会重置为当前时间。
        //它会和lastHeartbeatSend做比较,那个时间比较晚,就使用哪个作为上次发送哦的时间
        private long lastSessionReset;
        // consumer每次调用发送心跳的时间,如果now - lastPoll > maxPollInterval 就认为consumer maybeLeaveGroup
        private long lastPoll;
        // 心跳返回错误,就设置为true,这时候就将发送的时间间隔设置为retryBackoffMs
        private boolean heartbeatFailed;
    

    HeartBeat 最重要的作用就是计算当前到下次要发送heartbeat的时间间隔。

        public long timeToNextHeartbeat(long now) {
            long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
            final long delayToNextHeartbeat;
            if (heartbeatFailed)
                delayToNextHeartbeat = retryBackoffMs;
            else
                delayToNextHeartbeat = heartbeatInterval;
    
            if (timeSinceLastHeartbeat > delayToNextHeartbeat)
                return 0;
            else
                return delayToNextHeartbeat - timeSinceLastHeartbeat;
        }
    

    HeartBeatThread

    HeartBeatThread 的主要逻辑就是等待下次要发送的时间,发送一次心跳,并查看返回值。

    它对AbstractCoordinator.this进行了同步, 然后判断当前满足发送心跳的条件后,通过sendHeartbeatRequest发送心跳请求。心跳包含下面的内容:

    private final String groupId;
    private final int groupGenerationId;
    private final String memberId;
    

    SubscriptionState

    这个类用来保存消费者消费的topic, partition, offset的信息。
    consumer通过这个类subscribe topic。

    这个类里面最重要的变量应该就是:

       private final PartitionStates<TopicPartitionState> assignment;
    

    assignment 里面保存了这个consumer分配到的TopicPartition,以及这个parition当前的消费状态。
    PartitionStates 类分装一个LinkedHashMap,它保存了<TopicPartition, TopicPartitionState>键值对。

    TopicPartition里面包含了

    private final int partition;
    private final String topic;
    

    TopicPartitionState 包含了

    
    private Long position; // last consumed position
    private OffsetAndMetadata committed;  // last committed position
    private boolean paused;  // whether this partition has been paused by the user
    private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
    

    通过上面这两个变量就知道了当前consumer的操作状态,所有的函数基本都是围绕着assignment 进行操作。

    Fetcher

    fetcher负责从broker中拉取数据,并保存在一个队列中。consumer 在poll 的时候,首先会从这个队列中拿一部分数据进行处理。如果队列中没有数据了,fetcher 会再次拉取数据。

    fetcher会通过sendFetches拉取数据, 并将结果保存在
    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;中。 然后 consumer在poll 的时候,就会调用fetchedRecords从completedFetches中拉取数据。

    了解了上面这些信息,再看pollOnce的逻辑,就明白多了:

        private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            coordinator.poll(time.milliseconds());
    
            // fetch positions if we have partitions we're subscribed to that we
            // don't know the offset for
            //更新offset
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions());
    
            // if data is available already, return it immediately
            // 如果 completedFetches 队列中有数据,就直接拿数据
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
            if (!records.isEmpty())
                return records;
    
            // send any new fetches (won't resend pending fetches)
            // 准备好发送请求
            fetcher.sendFetches();
            
            long now = time.milliseconds();
            long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
            // 发送请求到服务端,但是如果之前发送的fectch请求还在路上的话,就block等待。
            client.poll(pollTimeout, now, new PollCondition() {
                @Override
                public boolean shouldBlock() {
                    // since a fetch might be completed by the background thread, we need this poll condition
                    // to ensure that we do not block unnecessarily in poll()
                    return !fetcher.hasCompletedFetches();
                }
            });
    
            // after the long poll, we should check whether the group needs to rebalance
            // prior to returning data so that the group can stabilize faster
            if (coordinator.needRejoin())
                return Collections.emptyMap();
            // 再次返回数据
            return fetcher.fetchedRecords();
        }
    
    
  • 相关阅读:
    MYSQL中replace into的用法
    Typora自定义样式
    Advanced Installer轻松带你入门
    H2数据库入门,看这篇就对了
    Linux之vim的使用
    Linux文件上传与下载
    @ConfigurationProperties 注解使用姿势,这一篇就够了
    Javadoc 使用详解
    MySQL学习提升
    JS前端获取用户的ip地址的方法
  • 原文地址:https://www.cnblogs.com/SpeakSoftlyLove/p/7251616.html
Copyright © 2011-2022 走看看