zoukankan      html  css  js  c++  java
  • kafka-独立消费者,可以消费指定partition的方式

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    
    /**
     * @Description kafka消费者
     */
    public class KafkaConsumerTest {
        
        /**
         * 独立消费者
         * 消费者可以为自己分配分区,不需要订阅主题,不会发生再均衡,没有群组概念
         * 弊端:如果主题新增了分区,消费者不会受到通知,所以,要么周期性的调用consumer.partitionsFor()方法来检查是否有新分区加入,要么在添加新分区后重启应用程序
         */
        public static void singleConsumer(){
            consumer = new KafkaConsumer<String, String>(Properties);
            List<PartitionInfo> partitionInfos = consumer.partitionsFor("test_topic");
            List<TopicPartition> partitions = new ArrayList<>();
    
            if (null != partitionInfos){
                for (PartitionInfo partitionInfo : partitionInfos) {
                   //添加需要的partition到集合中
                    partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
                //手动为消费者指定partition列表
                consumer.assign(partitions);
                try {
                    while (true){
                        //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                        //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                        //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                        //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        System.out.println("==== data get ====");
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                                    record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                        }
                    }
                } catch(Exception e){
                    e.printStackTrace();
                } finally {
                    //退出应用前使用close方法关闭消费者。
                    //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。
                    consumer.close();
                }
            }
        }
    
    }  
  • 相关阅读:
    6554545555
    484844
    学习资料整理
    word加上尾注之后参考文献下面的横线去除
    数据结构+算法
    python编程
    计算机网络(1)
    数据结构
    数据分析笔试(3)
    数据分析笔试(2)
  • 原文地址:https://www.cnblogs.com/bin-zhao/p/14346294.html
Copyright © 2011-2022 走看看