zoukankan      html  css  js  c++  java
  • kafka max.poll.interval.ms配置太短

    max.poll.interval.ms这个应该是消费者每次去kafka拉取数据最大间隔,如果超过这个间隔,服务端会认为消费者已离线。触发rebalance

    demo
     1     public ConsumerDemo(String topicName) {
     2         Properties props = new Properties();
     3         props.put("bootstrap.servers", "localhost:9092");
     4         props.put("group.id", GROUPID);
     5         props.put("enable.auto.commit", "false");
     6         props.put("max.poll.interval.ms", "1000");
     7         props.put("auto.offset.reset", "earliest");
     8         props.put("key.deserializer", StringDeserializer.class.getName());
     9         props.put("value.deserializer", StringDeserializer.class.getName());
    10         this.consumer = new KafkaConsumer<String, String>(props);
    11         this.topic = topicName;
    12         this.consumer.subscribe(Arrays.asList(topic));
    13     }

    5行配置自动提交为false,手动提交。6行配置 max.poll.interval.ms为1秒

     1     public void receiveMsg() {
     2         int messageNo = 1;
     3         System.out.println("---------开始消费---------");
     4         try {
     5             for (;;) {
     6                 msgList = consumer.poll(1000);
     7                 System.out.println("start sleep" + System.currentTimeMillis() / 1000);
     8                 Thread.sleep(10000);
     9                 System.out.println("end sleep" + System.currentTimeMillis() / 1000);
    10                 if(null!=msgList&&msgList.count()>0){
    11                     for (ConsumerRecord<String, String> record : msgList) {
    12                         System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
    13                     }
    14                 }else{
    15                     Thread.sleep(1000);
    16                 }
    17                 consumer.commitSync();
    18             }
    19         } catch (InterruptedException e) {
    20             e.printStackTrace();
    21         } finally {
    22             consumer.close();
    23         }
    24     }

    8行slepp 10秒,模拟处理消息耗时。提交消息的时候报错

    Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1211)
        at com.gxf.kafka.ConsumerDemo.receiveMsg(ConsumerDemo.java:49)
        at com.gxf.kafka.ConsumerDemo.main(ConsumerDemo.java:59)

    max.poll.interval.ms 可以配置稍微大点,或者减少处理时间,每次少拉取数据,异步处理等

    Please call me JiangYouDang!
  • 相关阅读:
    [Windwos Phone 7] Accelerometer
    [Windwos Phone 7] 获取设备相关信息
    实现ZUNE上软件商城的软件星级推荐效果
    [Windows Phone 7]如何判断手机是否有网络连接
    [Windows Phone 7]UI对屏幕的自适应的处理
    [Windows Phone 7]开发分享图片的插件(2)
    [Windows Phone 7]如何导航页面和页面间传值
    windows phone 从cer中提出公钥然后再RSA加密的问题
    异步上传文件插件AjaxFileUploader在Asp.net MVC中应用
    微软认证考试分析
  • 原文地址:https://www.cnblogs.com/luckygxf/p/15468693.html
Copyright © 2011-2022 走看看