zoukankan      html  css  js  c++  java
  • kafka consumer代码梳理

    kafka consumer是一个单纯的单线程程序,因此相对于producer会更好理解些。阅读consumer代码的关键是理解回调,因为consumer中使用了大量的回调函数。参看kafka中的回调函数

    1 整体流程

    从KafkaConsumer#pollOnce(..)入口来看consumer的整体流程

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            coordinator.ensureCoordinatorReady(); // 发送获取coordinator请求,直到获取到coordinator
    
            if (subscriptions.partitionsAutoAssigned())
                coordinator.ensurePartitionAssignment(); // 发送joinGroup和syncGroup,直到获取到consumer被分配的parttion信息;并启动心跳
    
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions()); // 拉取offset信息和commited信息,以便拉取数据的时候直到从哪开始拉取
    
            long now = time.milliseconds();
    
            client.executeDelayedTasks(now);
    
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 从本地数据结构中读取,并不是发送请求
    
            if (!records.isEmpty()) // 如果获取到就直接返回
                return records;
    
            fetcher.sendFetches(); // 发送拉取数据请求
            client.poll(timeout, now); // 真正的发送
            return fetcher.fetchedRecords(); // 从本地数据结构中读取,并不是发送请求
        }
    

    2 Reblance joinGroup和syncGroup

    consumer需要向coordinator发送请求,来知道自己负责消费哪些topic的哪些partiton。这个过程可以分为两个请求:

    1. joinGroup。joinGroup请求加入消费组,一旦coordinator确定了所有成员都发送了joinGroup,就会返回给客户端response,response中包括memberid、generation、consumer是否是leader等信息。
    2. syncGroup。如果consumer是leader的话,他会在本地将已经分配好的partiton信息附加到request中,告诉coordinator,我是这样分配的。这里需要注意consumer分区的分配是放在consumer端的。如果是普通的非leader consumer,那么就是简单的请求。无论是leader还是普通的消费者, coordinator都会返回consumer需要消费的parttion列表。

    joinGroup和syncGroup的主要逻辑在AbstractCoordinator#ensureActiveGroup(..),在发送join和sync之前会提交一把offset,这样做是为了防止reblance造成的重复消费。

    发送sync请求是在join请求的回调函数中,即AbstractCoordinator#JoinGroupResponseHandler(..),也就是说当join请求返回后,调用response的时候会发送一次sync请求。

    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    
            @Override
            public JoinGroupResponse parse(ClientResponse response) {
                return new JoinGroupResponse(response.responseBody());
            }
    
            @Override
            public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
                Errors error = Errors.forCode(joinResponse.errorCode());
                if (error == Errors.NONE) {
                    log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                    AbstractCoordinator.this.memberId = joinResponse.memberId(); // 读取response中的memberid
                    AbstractCoordinator.this.generation = joinResponse.generationId(); // generationId
                    AbstractCoordinator.this.rejoinNeeded = false;
                    AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                    sensors.joinLatency.record(response.requestLatencyMs());
    
                    // 发送sync请求
                    if (joinResponse.isLeader()) {
                        onJoinLeader(joinResponse).chain(future);
                    } else {
                        onJoinFollower().chain(future);
                    }
                    // 省略其他
                } 
            }
        }
    

    需要注意的是,kafka一个group可以消费多个topic,假设如果有两个topic:TopicA和TopicB,他们分别都有一个消费组名字都叫test,如果TopicA的test内消费者数量变化引起reblance,会造成TopicB的test也会reblance的。可以看下这里:http://www.cnblogs.com/dongxiao-yang/p/5417956.html

    3 heartBeat

    在发送完joinGroup后会启动heartBeat。HeartbeatTask实现了DelayedTask。heatbeat定时向coordinator发送心跳信息,如果返回ILLEGAL_GENERATION,说明coordinator已经重新进行了reblance,这个时候consuemr就需要再次发送join和sync请求。如下HeartbeatCompletionHandler

    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
            @Override
            public HeartbeatResponse parse(ClientResponse response) {
                return new HeartbeatResponse(response.responseBody());
            }
    
            @Override
            public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
                sensors.heartbeatLatency.record(response.requestLatencyMs());
                Errors error = Errors.forCode(heartbeatResponse.errorCode());
                if (error == Errors.NONE) {
                    log.debug("Received successful heartbeat response for group {}", groupId);
                    future.complete(null);
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
                            groupId, coordinator);
                    coordinatorDead();
                    future.raise(error);
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
                    AbstractCoordinator.this.rejoinNeeded = true;
                    future.raise(Errors.REBALANCE_IN_PROGRESS);
                } else if (error == Errors.ILLEGAL_GENERATION) { // 服务端已经是新一代了,客户端需要reblance。
                    log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
                    AbstractCoordinator.this.rejoinNeeded = true; // rejoinNeeded置为true,下次拉取的时候会重新发送join和sync请求
                    future.raise(Errors.ILLEGAL_GENERATION);
                } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                    log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
                    memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                    AbstractCoordinator.this.rejoinNeeded = true;
                    future.raise(Errors.UNKNOWN_MEMBER_ID);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(groupId));
                } else {
                    future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
                }
            }
        }
    

    4 DelayedTask

    DelayedTask是一个接口,只有一个run方法,实现了DelayedTask的类只有两个:AutoCommitTask和HeartbeatTask。两个都是定时请求的任务。那么consumer单线程是如何实现定时提交的呢?原来Consumer会将AutoCommitTask和HeartbeatTask放在ConsumerNetworkClient#DelayedTaskQueue中,DelayedTaskQueue中包含一个PriorityQueue,会将DelayedTask封装成Entry并根据时间优先级排序。每次poll的时候都会从DelayedTaskQueue中获取第一个,根据第一个Entry剩余时间来确定poll阻塞时间。

    ConsumerNetworkClient调用schedule将DelayedTaskQueue放到ConsumerNetworkClient#DelayedTaskQueue中

    public void schedule(DelayedTask task, long at) {
            delayedTasks.add(task, at); // DelayedTaskQueue#add
    } 
    

    DelayedTaskQueue#add

    public class DelayedTaskQueue {
    
        private PriorityQueue<Entry> tasks; // 优先级队列
    
        public DelayedTaskQueue() {
            tasks = new PriorityQueue<Entry>();
        }
    
        /**
         * Schedule a task for execution in the future.
         *
         * @param task the task to execute
         * @param at the time at which to
         */
        public void add(DelayedTask task, long at) {
            tasks.add(new Entry(task, at));
        }
        // ...
    }
    

    AutoCommitTask和HeartbeatTask为了能够一直执行,会在回调函数中将自己重新加入到DelayedTaskQueue中,并指定下次执行的时间。这样就可以不停的执行了。以heartbeat为例

    private class HeartbeatTask implements DelayedTask {
    
            private boolean requestInFlight = false;
    
            public void reset() {
                // start or restart the heartbeat task to be executed at the next chance
                long now = time.milliseconds();
                heartbeat.resetSessionTimeout(now);
                client.unschedule(this);
    
                if (!requestInFlight)
                    client.schedule(this, now);
            }
    
            @Override
            public void run(final long now) {
                if (generation < 0 || needRejoin() || coordinatorUnknown()) {
                    // no need to send the heartbeat we're not using auto-assignment or if we are
                    // awaiting a rebalance
                    return;
                }
    
                if (heartbeat.sessionTimeoutExpired(now)) {
                    // we haven't received a successful heartbeat in one session interval
                    // so mark the coordinator dead
                    coordinatorDead();
                    return;
                }
    
                if (!heartbeat.shouldHeartbeat(now)) {
                    // we don't need to heartbeat now, so reschedule for when we do
                    client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
                } else {
                    heartbeat.sentHeartbeat(now);
                    requestInFlight = true;
    
                    RequestFuture<Void> future = sendHeartbeatRequest();
                    future.addListener(new RequestFutureListener<Void>() {
                        @Override
                        public void onSuccess(Void value) {
                            requestInFlight = false;
                            long now = time.milliseconds();
                            heartbeat.receiveHeartbeat(now);
                            long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
    
                            // 回调中再次加入,实现了循环定时执行
                            client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                        }
    
                        @Override
                        public void onFailure(RuntimeException e) {
                            requestInFlight = false;
                            client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                        }
                    });
                }
            }
        }
    

    5 updateFetchPositions

    updateFetchPositions 用于更新commited和offset信息。客户端的消费状态是保存在SubscriptionState中的。SubscriptionState有一下主要属性

    public class SubscriptionState {
        private Pattern subscribedPattern;
        // 消费者订阅的topic
        private final Set<String> subscription;
        private final Set<String> groupSubscription;
        private final Set<TopicPartition> userAssignment;
        // 消费状态
        private final Map<TopicPartition, TopicPartitionState> assignment;
        private boolean needsPartitionAssignment;
        private boolean needsFetchCommittedOffsets;
        private final OffsetResetStrategy defaultResetStrategy;
        private ConsumerRebalanceListener listener;
        // ...省略
    }
    
    private static class TopicPartitionState {
            private Long position; // 消费位置,从coordinator拉取的时候会带上该字段
            private OffsetAndMetadata committed;  // 已经提交的offset
            private boolean paused;  // whether this partition has been paused by the user
            private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
    }
    

    消费状态信息最终被保存在TopicPartitionState中,topicPartitionState中有两个重要的属性:committed和position。需要注意的是commited和position其实表示下一次需要消费的位置,比如0-10的offsetc都已经提交了,那么从coordinator拉取到的committed是11而不是10;position也是一样的,如果已经消费到15,那么position的值是16。更多可见consumer提交offset原理

    6 几个重要的参数

    1. fetch.min.bytes 一个parttion拉取的最小字节数。consumer是批量从broker拉取消息的,fetch.min.bytes表示最小拉取多少字节才返回。默认值是1
    2. fetch.max.wait.ms 拉取数据的时候最长等待时间,与fetch.min.bytes配合使用。等待fetch.max.wait.ms时间后,还没有得到fetch.min.bytes大小的数据则返回。默认值500.
    3. max.partition.fetch.bytes 一个partiton最多拉取字节数。默认值1048576,即1M。

    以上参数都是放到request中。如下Fetcher#createFetchRequests(..)

    private Map<Node, FetchRequest> createFetchRequests() {
            // create the fetch info
            Cluster cluster = metadata.fetch();
            Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
            for (TopicPartition partition : fetchablePartitions()) {
                Node node = cluster.leaderFor(partition);
                if (node == null) {
                    metadata.requestUpdate();
                } else if (this.client.pendingRequestCount(node) == 0) {
                    // if there is a leader and no in-flight requests, issue a new fetch
                    Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                    if (fetch == null) {
                        fetch = new HashMap<>();
                        fetchable.put(node, fetch);
                    }
    
                    long position = this.subscriptions.position(partition);
                    fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); // fetchSize即max.partition.fetch.bytes
                    log.trace("Added fetch request for partition {} at offset {}", partition, position);
                }
            }
    
            // create the fetches
            Map<Node, FetchRequest> requests = new HashMap<>();
            for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
                Node node = entry.getKey();
                // maxWaitMs即fetch.max.wait.ms,minBytes即fetch.min.byte
                FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
                requests.put(node, fetch);
            }
            return requests;
        }
    
    1. max.poll.records 返回的最大record数。与以上三个参数不同,该参数不会放到fetch request中,拉取的records会放在本地变量中,该参数表示将本地变量中多少records返回。

    Fetcher拉取的所有消息都会被放到放到records中,record是一个List,存放了所有partiton的record,max.poll.records参数就用来配置每次从list中返回多少条record的,注意是所有partiton的。

    Fetcher#fetchedRecords(..)

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
            if (this.subscriptions.partitionAssignmentNeeded()) {
                return Collections.emptyMap();
            } else {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
                throwIfOffsetOutOfRange();
                throwIfUnauthorizedTopics();
                throwIfRecordTooLarge();
    
                int maxRecords = maxPollRecords;
                Iterator<PartitionRecords<K, V>> iterator = records.iterator();
                while (iterator.hasNext() && maxRecords > 0) {
                    PartitionRecords<K, V> part = iterator.next();
                    maxRecords -= append(drained, part, maxRecords); // maxRecords就是max.poll.records
                    if (part.isConsumed())
                        iterator.remove();
                }
                return drained;
            }
        }
    
    1. 另外在调用consumer api的时候需要制定timeout时间,如果超过timeout仍然没有消息则返回空的records。
    while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000); // timeout时间
    //            System.out.println("begin for 2");
                for (ConsumerRecord<String, String> record : records) {
    //                System.out.println("hello");
                    System.out.println(record.partition() + " " + record.offset());
                }
            }
    
  • 相关阅读:
    element-ui 设置input的只读或禁用
    vue 获取后端数据打印结果undefined问题
    用yaml来编写配置文件
    [LeetCode] 28. 实现strStr()
    [LeetCode] 25. k个一组翻转链表
    [LeetCode] 26. 删除排序数组中的重复项
    [LeetCode] 24. 两两交换链表中的节点
    [LeetCode] 23. 合并K个排序链表
    [LeetCode] 21. 合并两个有序链表
    [LeetCode] 22. 括号生成
  • 原文地址:https://www.cnblogs.com/set-cookie/p/9062125.html
Copyright © 2011-2022 走看看