zoukankan      html  css  js  c++  java
  • kafka学习总结017 --- consumer配置参数之max.poll.interval.ms

    max.poll.interval.ms参数用于指定consumer两次poll的最大时间间隔(默认5分钟),如果超过了该间隔consumer client会主动向coordinator发起LeaveGroup请求,触发rebalance;然后consumer重新发送JoinGroup请求

    示例如下:

    1. 配置max.poll.interval.ms为1000ms

    public static KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
        return new KafkaConsumer<>(properties);
    }

    2.消费者线程sleep 11000ms,模拟两次poll时间间隔超过max.poll.interval.ms

    @Slf4j
    public class MyConsumer {
    
        public static void main(String[] args) throws InterruptedException {
            KafkaConsumer<String, String> consumer = KafkaTestUtil.createConsumer();
            consumer.subscribe(Collections.singleton(KafkaTestContants.FIRST_TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord<String, String> record : records) {
                    log.warn("consume msg: topic={}, partition={}, offset={}, msg={}", record.topic(), record.partition(), record.offset(), record.value());
                }
                Thread.sleep(11000);
            }
        }
    }

    3.执行结果

    09:34:46,858 [kafka-coordinator-heartbeat-thread | group1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-da40b0d7-9dda-484d-9722-b512bf351c56 sending LeaveGroup request to coordinator 192.168.1.8:9092 (id: 2147483645 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Failing OffsetCommit request since the consumer is not part of an active group
    09:34:47,979 [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Synchronous auto-commit of offsets {topic1-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, topic1-0=OffsetAndMetadata{offset=4, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
    09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Lost previously assigned partitions topic1-1, topic1-0
    09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
    09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
    09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
    09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 22: {consumer-group1-1-32cfa301-a26c-4f3a-b270-9bb1bbc51fef=Assignment(partitions=[topic1-0, topic1-1])}
    09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation 22

    4.使用建议

    (1)实际应用中,消费到的数据处理时长不宜超过max.poll.interval.ms,否则会触发rebalance

    (2)如果处理消费到的数据耗时,可以尝试通过减小max.poll.records的方式减小单次拉取的记录数(默认是500条)

  • 相关阅读:
    BI的相关技术和产品细分(转自娄工)
    SOA系列二:采用SOA的常见失误
    ASP.NET第三方控件网站
    BI名词字典
    VSS2005 添加文件夹方法!
    c#范型编程系列一(非原创)
    数据挖掘项目的生命周期
    SQLServer 2005开发与商业智能培训大纲
    CVSNT配置配置与在ECLIPSE中使用
    JS通用UI框架
  • 原文地址:https://www.cnblogs.com/sniffs/p/13205196.html
Copyright © 2011-2022 走看看