代码改变世界
[登录 · 注册]
  • kafka auto.offset.reset latest earliest 详解
  •  

    一,latest和earliest区别

    1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

    2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

    提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

    二,创建topic

    1. # bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank  
    2. Created topic "tank".  
    3.   
    4. # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank  
    5. Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:  
    6.  Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2  
    7.  Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0  
    8.  Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1  

    三,生产数据和接收生产数据

    1. [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank  
    2. >1  
    3. >2  
    4. >3  
    5. >4  
    6. >5  
    7. >6  
    8. 。。。。。。。。。省略。。。。。。。。。  
    9. [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning  
    10. 1  
    11. 2  
    12. 3  
    13. 4  
    14. 5  
    15. 6  
    16. 。。。。。。。。省略。。。。。。。。  

    四,测试代码

    1. object tank {  
    2.     def main(args: Array[String]): Unit = {  
    3.         val pros: Properties = new Properties  
    4.         pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")  
    5.         /*分组由消费者决定,完全自定义,没有要求*/  
    6.         pros.put("group.id", "tank")  
    7.         //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic  
    8.         pros.put("enable.auto.commit", "false")  
    9.         pros.put("auto.commit.interval.ms", "1000")  
    10.         pros.put("max.poll.records", "5")  
    11.         pros.put("session.timeout.ms", "30000")  
    12.         //只有当offset不存在的时候,才用latest或者earliest  
    13.         pros.put("auto.offset.reset", "latest")  
    14.   
    15.         pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
    16.         pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
    17.   
    18.         val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)  
    19.   
    20.         /*这里填写主题名称*/  
    21.         consumer.subscribe(util.Arrays.asList("tank"))  
    22.   
    23.         val system = akka.actor.ActorSystem("system")  
    24.         system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))  
    25.   
    26.     }  
    27.   
    28.     object tankTest {  
    29.         def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {  
    30.             val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))  
    31.             if (!records.isEmpty) {  
    32.                 for (record <- records) {  
    33.                     if (record.value != null && !record.value.equals("")) {  
    34.                         myLog.syncLog(record.value + " 准备开启消费者出列数据", "kafka", "get")  
    35.                     }  
    36.                 }  
    37.                 consumer.commitSync()  
    38.   
    39.             }  
    40.   
    41.         }  
    42.     }  
    43. }  

    五,测试1,过程如下

    1,查看offset

    1. # bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe  
    2. Error: Consumer group 'tank' does not exist.  

    在没有提交offset的情况,会报这个错误

    2,latest模式运行,拉取不到数据

    2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.
    2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.
    2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.

    3,再用kafka-console-producer.sh生产数据,latest是可以拉到的,并且是拉取最新的数据(程序运行以后的数据),以前提交的数据是拉取不到的。

    4,查看offset不报错了

    1. # bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe  
    2. Consumer group 'tank' has no active members.  
    3.   
    4. TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID  
    5. tank            1          12              14              2               -               -               -  
    6. tank            0          12              14              2               -               -               -  
    7. tank            2          13              15              2               -               -               -  

    5,将auto.offset.reset设置成earliest,第一次生产的数据也取不到

    在这里要注意:如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以

    六,测试2

    1,重新创建topic,重复上面的第二,第三步

    2,代码端先earliest,最早提交的数据是可以获取到的,再生产数据也是可以获取到的。

    3,将auto.offset.reset设置成latest,再生产数据也是可以获取到的。

    七,结论

    虽然auto.offset.reset默认是latest,但是建议使用earliest。

    参考链接:http://blog.51yip.com/hadoop/2130.html

  • 上一篇:Elasticsearch5.0 安装问题集锦
    下一篇:干货 | Elasticsearch多表关联设计指南
  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/xiohao/p/12827066.html
走看看 - 开发者的网上家园