zoukankan      html  css  js  c++  java
  • Kafka java消费者拉取消息源码

    消费者Demo

     1 import org.apache.kafka.clients.consumer.ConsumerRecord;
     2 import org.apache.kafka.clients.consumer.ConsumerRecords;
     3 import org.apache.kafka.clients.consumer.KafkaConsumer;
     4 import org.apache.kafka.common.serialization.StringDeserializer;
     5 
     6 import java.util.Arrays;
     7 import java.util.Properties;
     8 
     9 public class ConsumerDemo {
    10     private final KafkaConsumer<String, String> consumer;
    11     private ConsumerRecords<String, String> msgList;
    12     private final String topic;
    13     private static final String GROUPID = "groupA";
    14 
    15     public ConsumerDemo(String topicName) {
    16         Properties props = new Properties();
    17         props.put("bootstrap.servers", "localhost:9092");
    18         props.put("group.id", GROUPID);
    19         props.put("enable.auto.commit", "true");
    20         props.put("auto.commit.interval.ms", "1000");
    21         props.put("session.timeout.ms", "30000");
    22         props.put("auto.offset.reset", "earliest");
    23         props.put("key.deserializer", StringDeserializer.class.getName());
    24         props.put("value.deserializer", StringDeserializer.class.getName());
    25         this.consumer = new KafkaConsumer<String, String>(props);
    26         this.topic = topicName;
    27         this.consumer.subscribe(Arrays.asList(topic));
    28     }
    29 
    30 
    31     public void receiveMsg() {
    32         int messageNo = 1;
    33         System.out.println("---------开始消费---------");
    34         try {
    35             for (;;) {
    36                 msgList = consumer.poll(1000);
    37                 if(null!=msgList&&msgList.count()>0){
    38                     for (ConsumerRecord<String, String> record : msgList) {
    39                         System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
    40                     }
    41                 }else{
    42                     Thread.sleep(1000);
    43                 }
    44             }
    45         } catch (InterruptedException e) {
    46             e.printStackTrace();
    47         } finally {
    48             consumer.close();
    49         }
    50     }
    51     public static void main(String args[]) {
    52         ConsumerDemo consumerDemo = new ConsumerDemo("KAFKA_TEST");
    53         consumerDemo.receiveMsg();
    54     }
    55 }

    36行开始拉取kafka服务器消息,进入源码KafkaConsumer.java poll方法

     1     @Override
     2     public ConsumerRecords<K, V> poll(long timeout) {
     3         acquireAndEnsureOpen();
     4         try {
     5             if (timeout < 0)
     6                 throw new IllegalArgumentException("Timeout must not be negative");
     7 
     8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
     9                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
    10 
    11             // poll for new data until the timeout expires
    12             long start = time.milliseconds();
    13             long remaining = timeout;
    14             do {
    15                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
    16                 if (!records.isEmpty()) {
    17                     // before returning the fetched records, we can send off the next round of fetches
    18                     // and avoid block waiting for their responses to enable pipelining while the user
    19                     // is handling the fetched records.
    20                     //
    21                     // NOTE: since the consumed position has already been updated, we must not allow
    22                     // wakeups or any other errors to be triggered prior to returning the fetched records.
    23                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
    24                         client.pollNoWakeup();
    25 
    26                     if (this.interceptors == null)
    27                         return new ConsumerRecords<>(records);
    28                     else
    29                         return this.interceptors.onConsume(new ConsumerRecords<>(records));
    30                 }
    31 
    32                 long elapsed = time.milliseconds() - start;
    33                 remaining = timeout - elapsed;
    34             } while (remaining > 0);
    35 
    36             return ConsumerRecords.empty();
    37         } finally {
    38             release();
    39         }
    40     }

    15行pollOnce()方法拉取消息,进入pollOnce()方法

     1     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
     2         client.maybeTriggerWakeup();
     3         coordinator.poll(time.milliseconds(), timeout);
     4 
     5         // fetch positions if we have partitions we're subscribed to that we
     6         // don't know the offset for
     7         if (!subscriptions.hasAllFetchPositions())
     8             updateFetchPositions(this.subscriptions.missingFetchPositions());
     9 
    10         // if data is available already, return it immediately
    11         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    12         if (!records.isEmpty())
    13             return records;
    14 
    15         // send any new fetches (won't resend pending fetches)
    16         fetcher.sendFetches();
    17 
    18         long now = time.milliseconds();
    19         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
    20 
    21         client.poll(pollTimeout, now, new PollCondition() {
    22             @Override
    23             public boolean shouldBlock() {
    24                 // since a fetch might be completed by the background thread, we need this poll condition
    25                 // to ensure that we do not block unnecessarily in poll()
    26                 return !fetcher.hasCompletedFetches();
    27             }
    28         });
    29 
    30         // after the long poll, we should check whether the group needs to rebalance
    31         // prior to returning data so that the group can stabilize faster
    32         if (coordinator.needRejoin())
    33             return Collections.emptyMap();
    34 
    35         return fetcher.fetchedRecords();
    36     }

    11行拉取本地缓存的消息,本地消息为空,16行重新去服务器拉取

    Please call me JiangYouDang!
  • 相关阅读:
    nodejs简单用法一
    初识ege图形库
    安装Microsoft Visual 2010 sp1回滚的错误,无Windows Installer目录
    MongoDB学习要点
    在java中,静态的内部类和非静态内部类的区别
    医院监护系统的问题定义及分析系统可行性及数据定义方法
    机票预订系统的问题定义及分析系统可行性
    银行储蓄系统问题:问题定义及分析系统可行性
    银行储蓄系统:问题定义及分析系统可行性
    软件开发的早期阶段为什么进行可行性研究?应该从哪些方面研究系统的可行性?
  • 原文地址:https://www.cnblogs.com/luckygxf/p/15383633.html
Copyright © 2011-2022 走看看