zoukankan      html  css  js  c++  java
  • kafka auto.offset.reset参数解析

    auto.offset.reset关乎kafka数据的读取。常用的二个值是latest和earliest,默认是latest。

    如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二都都可以。

    earliest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    latest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    none
    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    1.latest和earliest区别

    1. earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    2. latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

    提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

    2.创建topic

    # bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank  
    Created topic "tank".  
      
    # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank  
    Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:  
     Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2  
     Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0  
     Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1  
    

    3.生产数据和接收生产数据

    [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank  
    >1  
    >2  
    >3  
    。。。。。。。。。省略。。。。。。。。。  
    [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning  
    1  
    2  
    3  
    

    4.测试代码

    object tank {  
        def main(args: Array[String]): Unit = {  
            val pros: Properties = new Properties  
            pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")  
            /*分组由消费者决定,完全自定义,没有要求*/  
            pros.put("group.id", "tank")  
            //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic  
            pros.put("enable.auto.commit", "false")  
            pros.put("auto.commit.interval.ms", "1000")  
            pros.put("max.poll.records", "5")  
            pros.put("session.timeout.ms", "30000")  
            //只有当offset不存在的时候,才用latest或者earliest  
            pros.put("auto.offset.reset", "latest")  
      
            pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
            pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")  
      
            val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)  
      
            /*这里填写主题名称*/  
            consumer.subscribe(util.Arrays.asList("tank"))  
      
            val system = akka.actor.ActorSystem("system")  
            system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))  
      
        }  
      
        object tankTest {  
            def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {  
                val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))  
                if (!records.isEmpty) {  
                    for (record <- records) {  
                        if (record.value != null && !record.value.equals("")) {  
                            myLog.syncLog(record.value + "	准备开启消费者出列数据", "kafka", "get")  
                        }  
                    }  
                    consumer.commitSync()  
      
                }  
      
            }  
        }  
    }  
    
  • 相关阅读:
    一些端口
    outlook 的微软手册
    目录摘要
    L2TP的包过滤规则
    outlook 的外出时助理程序对外部邮箱不起作用。1个解决办法和另外一个可能性
    用editplus 正则表达式修改联系人表
    Cisco NAT的理解。
    outlook 2003 无法记住密码
    ERD commander 2005的下载地址。
    outlook 2003启用日志记录排除故障。
  • 原文地址:https://www.cnblogs.com/aixing/p/13327354.html
Copyright © 2011-2022 走看看