zoukankan      html  css  js  c++  java
  • kafka手动提交,丢失数据

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    // props.put("enable.auto.commit", "true");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("max.poll.records", "10");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));

        logger.info("topic订阅成功");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            logger.info("拉取条数:"+ records.count());
            try {
                for (ConsumerRecord<String, String> record : records){
                    logger.info("offset = "+record.offset()+", key = "+record.key()+", value ="+record.value());
    

    // consumer.commitSync();在这提交的化会有丢失数据的风险,例如只消费了一条,然后就挂机了,机器在上线的话后面的也消费不到
    }
    } catch (Exception e) {
    e.printStackTrace();
    }finally {
    //正确提交,为拉取的一批数据处理完后,一次性提交
    consumer.commitSync();
    }

        }
  • 相关阅读:
    工作实战之项目常用技术
    Thymeleaf的错误解决方式
    实用小demo
    idea常用的几个插件
    idea2019+Plugins中搜索不到任何插件解决办法
    git的初体验
    springboot2.+的整合log4j2错误解决浅谈
    MobaXterm百度网盘下载
    阿里云RDS云数据库连接步骤
    读源码学编程之——死循环妙用
  • 原文地址:https://www.cnblogs.com/xingrui/p/15417631.html
Copyright © 2011-2022 走看看