zoukankan      html  css  js  c++  java
  • Kafka-再均衡监听器

    Kafka-再均衡监听器

    在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用subscribe()方法时传进去一个ConsumerRebalanceListener实例就可以了。

    public void onPartitionsRevoked(Collection<TopicPartition> partitions)

    方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里读取了。

    public void onPartitionsAssigned(Collection<TopicPartition> partitions)

    方法会在重新分配分区之后和消费者开始读取消息之前被调用。

     代码如下

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @Author FengZhen
     * @Date 2020-04-06 11:07
     * @Description kafka消费者
     */
    public class KafkaConsumerTest {
        private static Properties kafkaProps = new Properties();
        static {
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("group.id", "test");
            kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
        private static KafkaConsumer<String, String> consumer;/**
         * 指定偏移量提交
         */
        public static void commitSelfAppointWithRebalanceListener(){
            int count = 0;
    
            //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
            kafkaProps.put("enable.auto.commit", false);
            consumer = new KafkaConsumer<String, String>(kafkaProps);
            //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
            //如:test.*,订阅test相关的所有主题
            consumer.subscribe(Collections.singleton("test_partition"), new HandleRebalance());
            System.out.println("==== subscribe success ====");
            try {
                while (true){
                    //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                    //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                    //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                    //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    System.out.println("==== data get ====");
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                        if (count % 2 == 0){
                            //每2次提交一次,还可以根据时间间隔来提交
                            consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                                @Override
                                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                                    if (null != exception){
                                        System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception)));
                                    }
                                }
                            });
                        }
                        count++;
                    }
                    //异步提交(结合下方同步提交)
                    consumer.commitAsync();
                }
            } catch(Exception e){
                e.printStackTrace();
            } finally {
                //退出应用前使用close方法关闭消费者。
                //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
                consumer.close();
            }
        }
    
    
        /**
         * 再均衡监听器
         */
        private static class HandleRebalance implements ConsumerRebalanceListener{
    
            /**
             * 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里读取了。
             * @param partitions
             */
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
                consumer.commitSync(currentOffsets);
            }
    
            /**
             * 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
             * @param partitions
             */
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    
            }
        }
    
    }
  • 相关阅读:
    hdu 1003 dp最大子序列和
    模拟题 (+queue队列知识)
    hdu 1016 DFS
    OSGi 系列(二)之 Hello World
    OSGi 系列(一)之什么是 OSGi :Java 语言的动态模块系统
    Mina 系列(四)之KeepAliveFilter -- 心跳检测
    Mina 系列(三)之自定义编解码器.md
    Mina 系列(二)之基础
    Mina 快速入门
    Java 8 Optional 类深度解析
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/12642128.html
Copyright © 2011-2022 走看看