zoukankan      html  css  js  c++  java
  • spring kafka消费者配置介绍----ackMode

    当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;

    ackMode有以下7种值:

    public enum AckMode {
            // 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
            RECORD,
            // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
            BATCH,
            // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
            TIME,
            // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
            COUNT,
            // TIME | COUNT 有一个条件满足时提交
            COUNT_TIME,
            // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
            MANUAL,
            // 手动调用Acknowledgment.acknowledge()后立即提交
            MANUAL_IMMEDIATE,
        }

    如果设置 AckMode 模式为 MANUAL 或者 MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交;

    在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer的run()方法和processCommits()方法使用:

    public void run() {
      ......
         while (isRunning()) {
             try {
                if (!this.autoCommit && !this.isRecordAck) { // 当 autoCommit 为false时且 ackMode不是record时 调用 processCommits 方法,判断如何手动提交
              processCommits();
            }
            processSeeks();
            ......
        }  }
    }
    private void processCommits() {
                this.count += this.acks.size();  // acks 是一个LinkedBlockingQueue类型的阻塞队列,存放从kafka读取到的record数据
                handleAcks();
                long now;
                AckMode ackMode = this.containerProperties.getAckMode();
                if (!this.isManualImmediateAck) {  // 不是使用者手动调用
                    if (!this.isManualAck) {
                        updatePendingOffsets();
                    }
                    boolean countExceeded = this.count >= this.containerProperties.getAckCount(); 
                    if (this.isManualAck || this.isBatchAck || this.isRecordAck
                            || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                        ......
                        commitIfNecessary();
                        this.count = 0;
                    }
                    else {
                        now = System.currentTimeMillis();
                        boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                        if (ackMode.equals(AckMode.TIME) && elapsed) {
                            ......
                   c
    ommitIfNecessary(); this.last = now; } else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {               ...... commitIfNecessary(); this.last = now; this.count = 0; } } } }
  • 相关阅读:
    等宽布局和flex
    antd按需加载
    linux-redis cluster集群(redis5.x)
    linux-mysql-主从同步
    mysql-行转列
    Spring Bean 作用域
    ArrayList、LinkedList区别(jdk8)
    java类及实例初始化顺序
    线程池-结构
    GIT基础
  • 原文地址:https://www.cnblogs.com/super-jing/p/12194185.html
Copyright © 2011-2022 走看看