zoukankan      html  css  js  c++  java
  • Kafka-再均衡监听器

    Kafka-再均衡监听器

    在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用subscribe()方法时传进去一个ConsumerRebalanceListener实例就可以了。

    public void onPartitionsRevoked(Collection<TopicPartition> partitions)

    方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里读取了。

    public void onPartitionsAssigned(Collection<TopicPartition> partitions)

    方法会在重新分配分区之后和消费者开始读取消息之前被调用。

     代码如下

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @Author FengZhen
     * @Date 2020-04-06 11:07
     * @Description kafka消费者
     */
    public class KafkaConsumerTest {
        private static Properties kafkaProps = new Properties();
        static {
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("group.id", "test");
            kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
        private static KafkaConsumer<String, String> consumer;/**
         * 指定偏移量提交
         */
        public static void commitSelfAppointWithRebalanceListener(){
            int count = 0;
    
            //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
            kafkaProps.put("enable.auto.commit", false);
            consumer = new KafkaConsumer<String, String>(kafkaProps);
            //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
            //如:test.*,订阅test相关的所有主题
            consumer.subscribe(Collections.singleton("test_partition"), new HandleRebalance());
            System.out.println("==== subscribe success ====");
            try {
                while (true){
                    //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                    //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                    //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                    //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    System.out.println("==== data get ====");
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                        if (count % 2 == 0){
                            //每2次提交一次,还可以根据时间间隔来提交
                            consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                                @Override
                                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                                    if (null != exception){
                                        System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception)));
                                    }
                                }
                            });
                        }
                        count++;
                    }
                    //异步提交(结合下方同步提交)
                    consumer.commitAsync();
                }
            } catch(Exception e){
                e.printStackTrace();
            } finally {
                //退出应用前使用close方法关闭消费者。
                //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
                consumer.close();
            }
        }
    
    
        /**
         * 再均衡监听器
         */
        private static class HandleRebalance implements ConsumerRebalanceListener{
    
            /**
             * 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里读取了。
             * @param partitions
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
                consumer.commitSync(currentOffsets);
            }
    
            /**
             * 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
             * @param partitions
             */
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    
            }
        }
    
    }
  • 相关阅读:
    Apache Ant 1.9.1 版发布
    Apache Subversion 1.8.0rc2 发布
    GNU Gatekeeper 3.3 发布,网关守护管理
    Jekyll 1.0 发布,Ruby 的静态网站生成器
    R语言 3.0.1 源码已经提交到 Github
    SymmetricDS 3.4.0 发布,数据同步和复制
    beego 0.6.0 版本发布,Go 应用框架
    Doxygen 1.8.4 发布,文档生成工具
    SunshineCRM 20130518发布,附带更新说明
    Semplice Linux 4 发布,轻量级发行版
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/12642128.html
Copyright © 2011-2022 走看看