zoukankan      html  css  js  c++  java
  • consumer提交offset原理

    1 数据结构

    消费者的消费状态是保存在SubscriptionState类中的,而SubscriptionState有个重要的属性那就是assignment保存了消费者消费的partition及其partition的状态

    public class SubscriptionState {
    
        /* the pattern user has requested */
        private Pattern subscribedPattern;
    
        /* the list of topics the user has requested */
        private final Set<String> subscription;
    
        /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
        private final Set<String> groupSubscription;
    
        /* the list of partitions the user has requested */
        private final Set<TopicPartition> userAssignment;
    
        /* the list of partitions currently assigned */
        private final Map<TopicPartition, TopicPartitionState> assignment; // 关键, 保存了消费者消费的partition及其partition的状态
    
        //  ...
    

    看下TopicPartitionState。TopicPartitionState用于表示消费者消费到该partition哪个位置了,需要注意的是position表示下一条需要消费的位置而不是已经消费的位置,拉取消息的时候就是根据position来确定需要拉取的第一条消息的offset

    private static class TopicPartitionState {
            private Long position; // 下一条消费哪个offset
            private OffsetAndMetadata committed;  // 已经提交的position
            private boolean paused;  // whether this partition has been paused by the user
            private OffsetResetStrategy resetStrategy;  // 重置position的时候的策略
    
            // ...
    }
    
    public class OffsetAndMetadata implements Serializable {
        private final long offset;
        private final String metadata;
    }
    

    2 commit offset

    以KafkaConsumer#commitSync为例来看下客户端是如何提交offset的

    KafkaConsumer#commitSync

    public void commitSync() {
            acquire();
            try {
                commitSync(subscriptions.allConsumed()); // 调用SubscriptionState#allConsumed来获取已经消费的消息的位置,然后将其提交
            } finally {
                release();
            }
    }
    
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
            acquire();
            try {
                coordinator.commitOffsetsSync(offsets);
            } finally {
                release();
            }
    }
    

    2.1 获取已经消费的位置

    来看下SubscriptionState#allConsumed,从哪获取到消费到的位置。从下面的代码可以看出提交的offset就是TopicPartitionState#position

    public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
            Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
            for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
                TopicPartitionState state = entry.getValue();
                if (state.hasValidPosition())
                    allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));// 关键,原来是将TopicPartitionState中的position封装成OffsetAndMetadata,即提交的是TopicPartitionState#position
            }
            return allConsumed;
        }
    

    2.2 发送到网络

    获取到消费到的offset位置后,最终是通过ConsumerCoordinator#sendOffsetCommitRequest将offset发送到coordinator的

    
    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (coordinatorUnknown()) // 必须获取coordinator
                return RequestFuture.coordinatorNotAvailable();
    
            if (offsets.isEmpty())
                return RequestFuture.voidSuccess();
    
            // create the offset commit request
            Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                OffsetAndMetadata offsetAndMetadata = entry.getValue();
                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                        offsetAndMetadata.offset(), offsetAndMetadata.metadata())); // 以TopicPartition为key, offsetAndMetadat组成request中的数据
            }
    
            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                    this.generation,
                    this.memberId,
                    OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                    offsetData); 
    
            log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
    
            return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                    .compose(new OffsetCommitResponseHandler(offsets));// 发送到coordinator
        }
    
    

    2.3 处理response

    从上面代码最后一行可以看出处理response的逻辑在OffsetCommitResponseHandler中。如果提交成功,那么会将TopicPartitionState#position更新到TopicPartitionState#commit

    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
    
            private final Map<TopicPartition, OffsetAndMetadata> offsets;
    
            public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
                this.offsets = offsets;
            }
    
            @Override
            public OffsetCommitResponse parse(ClientResponse response) {
                return new OffsetCommitResponse(response.responseBody());
            }
    
            @Override
            public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
                sensors.commitLatency.record(response.requestLatencyMs());
                Set<String> unauthorizedTopics = new HashSet<>();
    
                for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); // this.offsets即sendOffsetCommitRequest中的入参,这点很关键
                    long offset = offsetAndMetadata.offset();
    
                    Errors error = Errors.forCode(entry.getValue());
                    if (error == Errors.NONE) {
                        if (subscriptions.isAssigned(tp))
                            subscriptions.committed(tp, offsetAndMetadata); // 更新TopicPartitionState#committed为发送的时候的TopicPartitionState#position
                    } 
                    // ...
                }
            }
        }
    

    3 总结

    1. 下一条要消费的消息的offset就是TopicPartitionState#position
    2. 提交offset的时候即将TopicPartitionState#position发送到coordinator
    3. 提交成功后则将TopicPartitionState#committed更新为TopicPartitionState#position
  • 相关阅读:
    [洛谷][P1503][鬼子进村][Treap]
    [noi 2004] 郁闷的出纳员
    bzoj 3224,tyvj 1728普通平衡树
    Treap
    [模拟赛]棘手的操作
    bzoj 4551[Tjoi2016&Heoi2016]树
    bzoj2527 [Poi2011]Meteors
    bzoj4152 [AMPPZ2014]The Captain
    bzoj4516 [Sdoi2016]生成魔咒
    bzoj4547 小奇的集合
  • 原文地址:https://www.cnblogs.com/set-cookie/p/9752514.html
Copyright © 2011-2022 走看看