zoukankan      html  css  js  c++  java
  • 关于怎么获取kafka指定位置offset消息(转)

    1.在kafka中如果不设置消费的信息的话,一个消息只能被一个group.id消费一次,而新加如的group.id则会被“消费管理”记录,并指定从当前记录的消息位置开始向后消费。如果有段时间消费者关闭了,并有发送者发送消息那么下次这个消费者启动时也会接收到,但是我们如果想要从这个topic的第一条消息消费呢?

    public class SimpleConsumerPerSonIndex2 {
    public static void main(String[] args) throws Exception {
     
     
          //Kafka consumer configuration settings
          String topicName = "mypartition001";
          Properties props = new Properties();
          
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "partitiontest112");
          props.put("enable.auto.commit", "true"); 
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          
          //要发送自定义对象,需要指定对象的反序列化类
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
            KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
            Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
          hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0));
          consumer.commitSync(hashMaps);
          consumer.subscribe(Arrays.asList(topicName));
          while (true) {
          ConsumerRecords<String, Object> records = consumer.poll(100);
             for (ConsumerRecord<String, Object> record : records){
            System.out.println(record.toString());
             }
          }
          
       }
    }

    首先我们在consumer.subscribe(Arrays.asList(topicName));订阅一个topic之前要设置从这个topic的offset为0的地方获取。
    注意:这样的方法要保证这个group.id是新加入,如果是以前存在的,那么会抛异常。


    2.如果以前就存在的groupid想要获取指定的topic的offset为0开始之后的消息:

    public class SimpleConsumerPerSonIndex2 {
    public static void main(String[] args) throws Exception {
     
     
          //Kafka consumer configuration settings
          String topicName = "mypartition001";
          Properties props = new Properties();
          
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "partitiontest002");
          props.put("enable.auto.commit", "true"); 
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          
          //要发送自定义对象,需要指定对象的反序列化类
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
        KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
    // consumer.subscribe(Arrays.asList(topicName));
          consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
          consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改变当前offset
    //       consumer.seek(new TopicPartition(topicName, 0), 10);//不改变当前offset
         
          while (true) {
          ConsumerRecords<String, Object> records = consumer.poll(100);
             for (ConsumerRecord<String, Object> record : records){
            System.out.println(record.toString());
             }
          }
          
       }
    }

    使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));来分配topic和partition,
     而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定从这个topic和partition的开始位置获取。


    3.存在的groupid获取指定的topic任意的offset

    上面的代码放开 consumer.seek(new TopicPartition(topicName, 0), 10);//不改变当前offset
    并注释 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改变当前offset;
    其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示从这个topic的partition中的offset为10的开始获取消息。

    需要注意的是 consumer.assign()是不会被消费者的组管理功能管理的,他相对于是一个临时的,不会改变当前group.id的offset,比如:
    在使用consumer.subscribe(Arrays.asList(topicName));时offset为20,如果通过2和3,已经获取了最新的消息offset是最新的,
    在下次通过 consumer.subscribe(Arrays.asList(topicName));来获取消息时offset还是20.还是会获取20以后的消息。
    其实在2、3的结果截图中我们也可以发现没有1中结果图的joining group的日志输出,表示没有加入到group中。

    ————————————————
    版权声明:本文为CSDN博主「也是右移」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/u014104286/article/details/77103541/

  • 相关阅读:
    安全和加密
    awk
    CentOS7练习
    CentOS7系统引导顺序以及排障
    网络配置
    RAID阵列搭建
    LVM逻辑卷
    java-web——第九课 request
    java-web——第八课 JSTL的显示格式
    java-web——第七课 JSTL
  • 原文地址:https://www.cnblogs.com/muxi0407/p/11794799.html
Copyright © 2011-2022 走看看