zoukankan      html  css  js  c++  java
  • Kafka消费者手动提交消息偏移

    生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作。消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量。

    1.同步提交

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * Created by zhangpeiran on 2018/10/9.
     */
    public class MyConsumer {
    
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","ip:9092");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id","DemoConsumerGroup");
    
            //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
            //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
            //置为earliest,表示从分区起始位置读取消息
            properties.put("auto.offset.reset","earliest");
    
            //设置手动提交消息偏移
            properties.put("enable.auto.commit","false");
    
            //一次拉取的最大消息条数
            properties.put("max.poll.records",10);
    
    
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            consumer.subscribe(Collections.singletonList("Demo3"));
    
            int count = 0;
            try {
                while (true){
                    ConsumerRecords<String,String> records = consumer.poll(10);
                    for(ConsumerRecord<String ,String> record : records){
                        count ++;
                        if(count == 50)
                            consumer.commitSync();
                        System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                    }
                    System.out.println(count);
                }
            } finally {
                consumer.close();
            }
        }
    }

    说明:在上述例子中,主题Demo3中已经有100条消息,第一次远行Consumer时,在读取到50条消息时,提交一次偏移量,输出的count值为100;第二次不改变消费group,会从51条开始读取,所以输出的count值为50

    2. 异步提交,同步提交时,在broker回应指,会一直阻塞、重试,限制应用的吞吐量,因此可以采用异步提交,异步提交失败时不会重试,因为如果提交失败时因为临时的问题导致的,那么后续的提交总户有成功的。

    consumer.commitAsync();

    3. 同步、异步组合提交,确保消费者在关闭或者再均衡之前提交成功

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * Created by zhangpeiran on 2018/10/9.
     */
    public class MyConsumer {
    
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","ip:9092");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("group.id","DemoConsumerGroup");
    
            //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
            //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
            //置为earliest,表示从分区起始位置读取消息
            properties.put("auto.offset.reset","earliest");
    
            //设置手动提交消息偏移
            properties.put("enable.auto.commit","false");
    
            //一次拉取的最大消息条数
            properties.put("max.poll.records",10);
    
    
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            consumer.subscribe(Collections.singletonList("Demo3"));
    
            int count = 0;
            try {
                while (true){
                    ConsumerRecords<String,String> records = consumer.poll(10);
                    for(ConsumerRecord<String ,String> record : records){
                        count ++;
                        //if(count == 50)
                            //consumer.commitAsync();
                            //consumer.commitSync();
                        System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                    }
                    consumer.commitAsync();
                    //System.out.println(count);
                }
            } finally {
                try {
                    consumer.commitSync();
                } finally {
                    consumer.close();
                }
                //consumer.close();
            }
        }
    }

     5. 提交特定的偏移量。前面提交的是最后一个偏移量,poll可能返回了大批数据,这样在再均衡时,可能重复处理的消息比较多。消费者API提供了指定分区和偏移量来提交

    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;

    /**
    * Created by zhangpeiran on 2018/10/9.
    */
    public class MyConsumer {

    public static void main(String[] args){
    Properties properties = new Properties();
    properties.put("bootstrap.servers","ip:9092");
    properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("group.id","DemoConsumerGroup");

    //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
    //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
    //置为earliest,表示从分区起始位置读取消息
    properties.put("auto.offset.reset","earliest");

    //设置手动提交消息偏移
    properties.put("enable.auto.commit","false");

    //一次拉取的最大消息条数
    properties.put("max.poll.records",1000);


    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    consumer.subscribe(Collections.singletonList("Demo5"));


    int cnt = 0;
    int count = 0;
    Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition,OffsetAndMetadata>();
    try {
    while (true){
    ConsumerRecords<String,String> records = consumer.poll(10);
    for(ConsumerRecord<String ,String> record : records){
    count ++;
    //if(count == 50)
    //consumer.commitAsync();
    //consumer.commitSync();

    //offset + 1,下次消费者从该偏移量开始拉取消息
    currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"no"));
    if ((count / 10 == 1) && (count % 10 == 0)){
    System.out.println(count);
    consumer.commitSync(currentOffsets);
    }
    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
    }
    //consumer.commitAsync();

    cnt ++;
    }
    } finally {
    try {
    //consumer.commitSync();
    } finally {
    consumer.close();
    }
    //consumer.close();
    }
    }
    }
     

    生产者生产了100条消息,上述代码的结果是:依次启动-暂停消费者10次,每次读取100,90,80,...10条消息,原因是每次消费者读取前10条的时候提交一次偏移量

  • 相关阅读:
    [NOIP2017 TG D2T2]宝藏(模拟退火)
    [洛谷P1337][JSOI2004]平衡点 / 吊打XXX
    [洛谷P4940]Portal2
    [CF1073E]Segment Sum
    [CF1066C]Books Queries
    [CF1065C]Make It Equal
    [洛谷P3469][POI2008]BLO-Blockade
    网络模型 ------->MobileNet-v3
    C++--------------------->>>>>>cmakelist的编写
    C++ ----------------》》》》》cmake list的
  • 原文地址:https://www.cnblogs.com/darange/p/9768791.html
Copyright © 2011-2022 走看看