zoukankan      html  css  js  c++  java
  • kafka原理和实践(四)spring-kafka消费者源码

    系列目录

    kafka原理和实践(一)原理:10分钟入门

    kafka原理和实践(二)spring-kafka简单实践

    kafka原理和实践(三)spring-kafka生产者源码

    kafka原理和实践(四)spring-kafka消费者源码

    kafka原理和实践(五)spring-kafka配置详解

    kafka原理和实践(六)总结升华

    ==============正文分割线=====================

    一、kafkaConsumer消费者模型

    如上图所示,spring-kafka消费者模型主要流程:

    1.容器启动,轮询执行消费。

    2.kafkaConsumer拉取消息流程:

    1)Fetcher请求获取器获取请求并存储在unset中

    2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords

    3. 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。

    一、kafkaConsumer构造

      1 @SuppressWarnings("unchecked")
      2     private KafkaConsumer(ConsumerConfig config,
      3                           Deserializer<K> keyDeserializer,
      4                           Deserializer<V> valueDeserializer) {
      5         try {
      6             log.debug("Starting the Kafka consumer");
      7             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
      8             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
      9             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
     10             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
     11                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
     12             this.time = new SystemTime();
     13 
     14             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
     15             if (clientId.length() <= 0)
     16                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
     17             this.clientId = clientId;
     18             Map<String, String> metricsTags = new LinkedHashMap<>();
     19             metricsTags.put("client-id", clientId);
     20             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
     21                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
     22                     .tags(metricsTags);
     23             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
     24                     MetricsReporter.class);
     25             reporters.add(new JmxReporter(JMX_PREFIX));
     26             this.metrics = new Metrics(metricConfig, reporters, time);
     27             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
     28 
     29             // load interceptors and make sure they get clientId
     30             Map<String, Object> userProvidedConfigs = config.originals();
     31             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
     32             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
     33                     ConsumerInterceptor.class);
     34             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
     35             if (keyDeserializer == null) {
     36                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
     37                         Deserializer.class);
     38                 this.keyDeserializer.configure(config.originals(), true);
     39             } else {
     40                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
     41                 this.keyDeserializer = keyDeserializer;
     42             }
     43             if (valueDeserializer == null) {
     44                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
     45                         Deserializer.class);
     46                 this.valueDeserializer.configure(config.originals(), false);
     47             } else {
     48                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
     49                 this.valueDeserializer = valueDeserializer;
     50             }
     51             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
     52             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
     53             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
     54             this.metadata.update(Cluster.bootstrap(addresses), 0);
     55             String metricGrpPrefix = "consumer";
     56             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
     57             NetworkClient netClient = new NetworkClient(
     58                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
     59                     this.metadata,
     60                     clientId,
     61                     100, // a fixed large enough value will suffice
     62                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
     63                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
     64                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
     65                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
     66             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
     67                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
     68             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
     69             this.subscriptions = new SubscriptionState(offsetResetStrategy);
     70             List<PartitionAssignor> assignors = config.getConfiguredInstances(
     71                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
     72                     PartitionAssignor.class);
     73             this.coordinator = new ConsumerCoordinator(this.client,
     74                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
     75                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
     76                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
     77                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
     78                     assignors,
     79                     this.metadata,
     80                     this.subscriptions,
     81                     metrics,
     82                     metricGrpPrefix,
     83                     this.time,
     84                     retryBackoffMs,
     85                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
     86                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
     87                     config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
     88                     this.interceptors,
     89                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
     90             this.fetcher = new Fetcher<>(this.client,
     91                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
     92                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
     93                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
     94                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
     95                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
     96                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
     97                     this.keyDeserializer,
     98                     this.valueDeserializer,
     99                     this.metadata,
    100                     this.subscriptions,
    101                     metrics,
    102                     metricGrpPrefix,
    103                     this.time,
    104                     this.retryBackoffMs);
    105 
    106             config.logUnused();
    107             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
    108 
    109             log.debug("Kafka consumer created");
    110         } catch (Throwable t) {
    111             // call close methods if internal objects are already constructed
    112             // this is to prevent resource leak. see KAFKA-2121
    113             close(true);
    114             // now propagate the exception
    115             throw new KafkaException("Failed to construct kafka consumer", t);
    116         }
    117     }

     从KafkaConsumer构造函数来看,核心组件有:

    1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。

    2.ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。

    3.SubscriptionState:订阅的TopicPartition的offset状态维护

    4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance

    5.Fetcher:从brokers上按照配置获取消息。

    二、消费者容器启动流程

    kafka消费者有两种常见的实现方式:

    1.xml配置文件

    2.基于注解实现

    其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。

    基于xml的总体配置如下:

     1 <!-- 1.定义consumer的参数 -->
     2     <bean id="consumerProperties" class="java.util.HashMap">
     3         <constructor-arg>
     4             <map>
     5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
     6                 <entry key="group.id" value="${group.id}" />
     7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
     8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
     9                 <entry key="key.deserializer"
    10                     value="org.apache.kafka.common.serialization.StringDeserializer" />
    11                 <entry key="value.deserializer"
    12                     value="org.apache.kafka.common.serialization.StringDeserializer" />
    13             </map>
    14         </constructor-arg>
    15     </bean>
    16 
    17     <!-- 2.创建consumerFactory bean -->
    18     <bean id="consumerFactory"
    19         class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
    20         <constructor-arg>
    21             <ref bean="consumerProperties" />
    22         </constructor-arg>
    23     </bean>
    24 
    25     <!-- 3.定义消费实现类 -->
    26     <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
    27 
    28     <!-- 4.消费者容器配置信息 -->
    29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    30         <!-- topic -->
    31         <constructor-arg name="topics">
    32             <list>
    33                 <value>${kafka.consumer.topic.credit.for.lease}</value>
    34                 <value>${loan.application.feedback.topic}</value>
    35                 <value>${templar.agreement.feedback.topic}</value>
    36                 <value>${templar.aggrement.active.feedback.topic}</value>
    37                 <value>${templar.aggrement.agreementRepaid.topic}</value>
    38                 <value>${templar.aggrement.agreementWithhold.topic}</value>
    39                 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
    40             </list>
    41         </constructor-arg>
    42         <property name="messageListener" ref="kafkaConsumerService" />
    43     </bean>
    44     <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
    45     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
    46         <constructor-arg ref="consumerFactory" />
    47         <constructor-arg ref="containerProperties" />
    48         <property name="concurrency" value="${concurrency}" />
    49     </bean>

     分为5个步骤:

     2.1.定义消费参数bean

    consumerProperties ,就是个map<key,value>

    2.2.创建consumerFactory bean

    DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者判断是否自动提交2个方法。通过consumerProperties作为参数构造。
    1 public interface ConsumerFactory<K, V> {
    2 
    3     Consumer<K, V> createConsumer();
    4 
    5     boolean isAutoCommit();
    6 
    7 
    8 }

    2.3.定义消费实现类

    自定义一个类实现MessageListener接口,接口设计如下:

    实现onMessage方法,去消费接收到的消息。两种方案:

    1)MessageListener 消费完消息后自动提交offset(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。

    2)AcknowledgingMessageListener 消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。

    2.4.监听容器配置信息

    ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
      1 public class ContainerProperties {
      2 
      3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
      4 
      5     private static final int DEFAULT_QUEUE_DEPTH = 1;
      6 
      7     private static final int DEFAULT_PAUSE_AFTER = 10000;
      8 
      9     /**
     10      * Topic names.监听的主题字符串数组
     11      */
     12     private final String[] topics;
     13 
     14     /**
     15      * Topic pattern.监听的主题模板
     16      */
     17     private final Pattern topicPattern;
     18 
     19     /**
     20      * Topics/partitions/initial offsets.
     21      */
     22     private final TopicPartitionInitialOffset[] topicPartitions;
     23 
     24     /**
     25      * 确认模式(自动确认属性为false时使用)
     26      * <ul>
     27      * <li>1.RECORD逐条确认: 每条消息被发送给监听者后确认</li>
     28      * <li>2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认</li>
     30      * <li>3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than
     31      * {@code #setPollTimeout(long) pollTimeout}.</li>
     32      * <li>4.COUNT计数确认: 当接收到指定数量之后确认</li>
     33      * <li>5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener</ul>
     36      */
     37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
     38 
     39     /**
     40      * The number of outstanding record count after which offsets should be
     41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
     42      * used.
     43      */
     44     private int ackCount;
     45 
     46     /**
     47      * The time (ms) after which outstanding offsets should be committed when
     48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
     49      * larger than
     50      */
     51     private long ackTime;
     52 
     53     /**
     54      * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55      * 56      */
     57     private Object messageListener;
     58 
     59     /**
     60      * The max time to block in the consumer waiting for records.
     61      */
     62     private volatile long pollTimeout = 1000;
     63 
     64     /**
     65      * 线程执行器:轮询消费者
     66      */
     67     private AsyncListenableTaskExecutor consumerTaskExecutor;
     68 
     69     /**
     70      * 线程执行器:调用监听器
     71      */
     72     private AsyncListenableTaskExecutor listenerTaskExecutor;
     73 
     74     /**
     75      * 错误回调,当监听器抛出异常时
     76      */
     77     private GenericErrorHandler<?> errorHandler;
     78 
     79     /**
     80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
     81      * true, the delay after which the consumer should be paused. Default 10000.
     82      */
     83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
     84 
     85     /**
     86      * When true, avoids rebalancing when this consumer is slow or throws a
     87      * qualifying exception - pauses the consumer. Default: true.
     88      * @see #pauseAfter
     89      */
     90     private boolean pauseEnabled = true;
     91 
     92     /**
     93      * Set the queue depth for handoffs from the consumer thread to the listener
     94      * thread. Default 1 (up to 2 in process).
     95      */
     96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
     97 
     98     /**
     99      * 停止容器超时时间    */
    103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
    104 
    105     /**
    106      * 用户定义的消费者再平衡监听器实现类     */
    108     private ConsumerRebalanceListener consumerRebalanceListener;
    109 
    110     /**
    111      * 提交回调,默认记录日志。      */
    114     private OffsetCommitCallback commitCallback;
    115 
    116     /**
    117      * Whether or not to call consumer.commitSync() or commitAsync() when the
    118      * container is responsible for commits. Default true. See
    119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
    120      * writing, async commits are not entirely reliable.
    121      */
    122     private boolean syncCommits = true;
    123 
    124     private boolean ackOnError = true;
    125 
    126     private Long idleEventInterval;
    127 
    128     public ContainerProperties(String... topics) {
    129         Assert.notEmpty(topics, "An array of topicPartitions must be provided");
    130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
    131         this.topicPattern = null;
    132         this.topicPartitions = null;
    133     }
    134 
    135     public ContainerProperties(Pattern topicPattern) {
    136         this.topics = null;
    137         this.topicPattern = topicPattern;
    138         this.topicPartitions = null;
    139     }
    140 
    141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
    142         this.topics = null;
    143         this.topicPattern = null;
    144         Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
    145         this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
    146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
    147     }
    148 ...省略各种set、get
    149 
    150 }

    2.5.启动并发消息监听容器

    核心类ConcurrentMessageListenerContainer,继承自抽象类AbstractMessageListenerContainer,类图如下:

    看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:

    1.构造函数,入参:消费者工厂ConsumerFactory+容器配置ContainerProperties

    2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源码如下:

      1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
      2 
      3     private final ConsumerFactory<K, V> consumerFactory;
      4 
      5     private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
      6 
      7     private int concurrency = 1;
      8 
      9     /**
     10      * Construct an instance with the supplied configuration properties.
     11      * The topic partitions are distributed evenly across the delegate
     12      * {@link KafkaMessageListenerContainer}s.
     13      * @param consumerFactory the consumer factory.
     14      * @param containerProperties the container properties.
     15      */
     16     public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
     17             ContainerProperties containerProperties) {
     18         super(containerProperties);
     19         Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
     20         this.consumerFactory = consumerFactory;
     21     }
     22 
     23     public int getConcurrency() {
     24         return this.concurrency;
     25     }
     26 
     27     /**
     28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
     29      * Messages from within the same partition will be processed sequentially.
     30      * @param concurrency the concurrency.
     31      */
     32     public void setConcurrency(int concurrency) {
     33         Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
     34         this.concurrency = concurrency;
     35     }
     36 
     37     /**
     38      * Return the list of {@link KafkaMessageListenerContainer}s created by
     39      * this container.
     40      * @return the list of {@link KafkaMessageListenerContainer}s created by
     41      * this container.
     42      */
     43     public List<KafkaMessageListenerContainer<K, V>> getContainers() {
     44         return Collections.unmodifiableList(this.containers);
     45     }
     46 
     47     /*
     48      * Under lifecycle lock.
     49      */
     50     @Override
     51     protected void doStart() {
     52         if (!isRunning()) {
     53             ContainerProperties containerProperties = getContainerProperties();
     54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
     55             if (topicPartitions != null//校验并发数>分区数,报错。
     56                     && this.concurrency > topicPartitions.length) {
     57                 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
     58                         + "equal to the number of partitions; reduced from " + this.concurrency + " to "
     59                         + topicPartitions.length);
     60                 this.concurrency = topicPartitions.length;//并发数最大只能=分区数
     61             }
     62             setRunning(true);
     63             //遍历创建监听器容器
     64             for (int i = 0; i < this.concurrency; i++) {
     65                 KafkaMessageListenerContainer<K, V> container;
     66                 if (topicPartitions == null) {
     67                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
     68                 }
     69                 else {
     70                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
     71                             partitionSubset(containerProperties, i));
     72                 }
     73                 if (getBeanName() != null) {
     74                     container.setBeanName(getBeanName() + "-" + i);
     75                 }
     76                 if (getApplicationEventPublisher() != null) {
     77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
     78                 }
     79                 container.setClientIdSuffix("-" + i);
     80                 container.start();//核心方法,启动容器
     81                 this.containers.add(container);
     82             }
     83         }
     84     }146 ...省略
    147 }

     继续追踪,调用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。

     1 private final Object lifecycleMonitor = new Object();
     2 
     3 @Override
     4     public final void start() {
     5         synchronized (this.lifecycleMonitor) {
     6             Assert.isTrue(
     7                     this.containerProperties.getMessageListener() instanceof KafkaDataListener,
     8                     "A " + KafkaDataListener.class.getName() + " implementation must be provided");
     9             doStart();
    10         }
    11     }
    12 
    13     protected abstract void doStart();

    最终调用的是KafkaMessageListenerContainer的doStart()

     1 @Override
     2     protected void doStart() {
     3         if (isRunning()) {
     4             return;
     5         }
     6         ContainerProperties containerProperties = getContainerProperties();
     7 
     8         if (!this.consumerFactory.isAutoCommit()) {
     9             AckMode ackMode = containerProperties.getAckMode();
    10             if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
    11                 Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
    12             }
    13             if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
    14                     && containerProperties.getAckTime() == 0) {
    15                 containerProperties.setAckTime(5000);
    16             }
    17         }
    18 
    19         Object messageListener = containerProperties.getMessageListener();
    20         Assert.state(messageListener != null, "A MessageListener is required");
    21         if (messageListener instanceof GenericAcknowledgingMessageListener) {
    22             this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
    23         }
    24         else if (messageListener instanceof GenericMessageListener) {
    25             this.listener = (GenericMessageListener<?>) messageListener;
    26         }
    27         else {
    28             throw new IllegalStateException("messageListener must be 'MessageListener' "
    29                     + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
    30         }
    31         if (containerProperties.getConsumerTaskExecutor() == null) {
    32             SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
    33                     (getBeanName() == null ? "" : getBeanName()) + "-C-");
    34             containerProperties.setConsumerTaskExecutor(consumerExecutor);
    35         }
    36         if (containerProperties.getListenerTaskExecutor() == null) {
    37             SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
    38                     (getBeanName() == null ? "" : getBeanName()) + "-L-");
    39             containerProperties.setListenerTaskExecutor(listenerExecutor);
    40         }//1.构建 监听消费者
    41         this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
    42         setRunning(true);
          //2.异步提交 监听消费者任务,返回Future并赋值。
    43 this.listenerConsumerFuture = containerProperties 44 .getConsumerTaskExecutor() 45 .submitListenable(this.listenerConsumer); 46 }

    doStart主要包含2个操作:构建内部类ListenerConsumer提交 监听消费者任务,返回Future并赋值。

    1.构建内部类ListenerConsumer

    ListenerConsumer类图如下:

    ListenerConsumer构造函数源码如下:

     1 @SuppressWarnings("unchecked")
     2         ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
     3             Assert.state(!this.isAnyManualAck || !this.autoCommit,
     4                     "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
     5             @SuppressWarnings("deprecation")
     6             final Consumer<K, V> consumer =
     7                     KafkaMessageListenerContainer.this.consumerFactory instanceof
     8                                     org.springframework.kafka.core.ClientIdSuffixAware
     9                             ? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
    10                                     .this.consumerFactory)
    11                                         .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
    12                             : KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
    13 
    14             this.theListener = listener == null ? ackListener : listener;
    15             ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
    16 
    17             if (KafkaMessageListenerContainer.this.topicPartitions == null) {
    18                 if (this.containerProperties.getTopicPattern() != null) {
    19                     consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
    20                 }
    21                 else {
    22                     consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
    23                 }
    24             }
    25             else {
    26                 List<TopicPartitionInitialOffset> topicPartitions =
    27                         Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
    28                 this.definedPartitions = new HashMap<>(topicPartitions.size());
    29                 for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
    30                     this.definedPartitions.put(topicPartition.topicPartition(),
    31                             new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
    32                 }
    33                 consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
    34             }
    35             this.consumer = consumer;
    36             GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
    37             this.genericListener = listener; 
    //1.
    if (this.theListener instanceof BatchAcknowledgingMessageListener) { 38 this.listener = null; 39 this.batchListener = null; 40 this.acknowledgingMessageListener = null; 41 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener<K, V>) this.theListener; 42 this.isBatchListener = true; 43 }//2. 44 else if (this.theListener instanceof AcknowledgingMessageListener) { 45 this.listener = null; 46 this.acknowledgingMessageListener = (AcknowledgingMessageListener<K, V>) this.theListener; 47 this.batchListener = null; 48 this.batchAcknowledgingMessageListener = null; 49 this.isBatchListener = false; 50 }//3. 51 else if (this.theListener instanceof BatchMessageListener) { 52 this.listener = null; 53 this.batchListener = (BatchMessageListener<K, V>) this.theListener; 54 this.acknowledgingMessageListener = null; 55 this.batchAcknowledgingMessageListener = null; 56 this.isBatchListener = true; 57 }//4. 58 else if (this.theListener instanceof MessageListener) { 59 this.listener = (MessageListener<K, V>) this.theListener; 60 this.batchListener = null; 61 this.acknowledgingMessageListener = null; 62 this.batchAcknowledgingMessageListener = null; 63 this.isBatchListener = false; 64 } 65 else { 66 throw new IllegalArgumentException("Listener must be one of 'MessageListener', " 67 + "'BatchMessageListener', 'AcknowledgingMessageListener', " 68 + "'BatchAcknowledgingMessageListener', not " + this.theListener.getClass().getName()); 69 } 70 if (this.isBatchListener) { 71 validateErrorHandler(true); 72 this.errorHandler = new LoggingErrorHandler(); 73 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler() 74 : (BatchErrorHandler) errHandler; 75 } 76 else { 77 validateErrorHandler(false); 78 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler; 79 this.batchErrorHandler = new BatchLoggingErrorHandler(); 80 } 81 Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener"); 82 }

    1.定义消费者订阅topic或者指定分区

    2.设置监听器,支持4种:

      1)BatchAcknowledgingMessageListener批量需确认消息监听器

      2)AcknowledgingMessageListener需确认消息监听器

      3)BatchMessageListener批量消息监听器

      4)MessageListener消息监听器(用的最多,一次消费一条)

    2.提交 监听消费者任务(ListenerConsumer),返回Future并赋值。

    这里我们看一下任务Runnable接口的run方法,分两种情况

    1.如果自定义了分区,没必要再平衡分配分区了,直接回调

    2.未指定分区,进入自旋消费

     1 @Override
     2         public void run() {
     3             if (this.genericListener instanceof ConsumerSeekAware) {
     4                 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
     5             }
     6             this.count = 0;
     7             this.last = System.currentTimeMillis();
     8             if (isRunning() && this.definedPartitions != null) {// 1.如果运行中且自定义了分区,没必要再平衡分配分区了,直接回调
     9                 initPartitionsIfNeeded();// 有需要就初始化分区
    10                 // 回调
    13                 if (!this.autoCommit) {
    14                     startInvoker();
    15                 }
    16             }
    17             long lastReceive = System.currentTimeMillis();
    18             long lastAlertAt = lastReceive;
    19             while (isRunning()) {//2.未指定分区,进入自旋消费
    20                 try {
    21                     if (!this.autoCommit) {
    22                         processCommits();// 如果手动提交,处理提交
    23                     }
    24                     processSeeks();// 重新定位偏移量,下一次消费时使用
    25                     if (this.logger.isTraceEnabled()) {
    26                         this.logger.trace("Polling (paused=" + this.paused + ")...");
    27                     }// 1)拉取消费记录
    28                     ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
    29                     if (records != null && this.logger.isDebugEnabled()) {
    30                         this.logger.debug("Received: " + records.count() + " records");
    31                     }
    32                     if (records != null && records.count() > 0) {
    33                         if (this.containerProperties.getIdleEventInterval() != null) {
    34                             lastReceive = System.currentTimeMillis();
    35                         }// 2)如果设置了自动提交,直接在当前线程执行
    39                         if (this.autoCommit) {
    40                             invokeListener(records);
    41                         }
    42                         else {// 3)否则发送消息进缓存队列
    43                             if (sendToListener(records)) {
    44                                 if (this.assignedPartitions != null) {
    45                                     // avoid group management rebalance due to a slow
    46                                     // consumer
    47                                     this.consumer.pause(this.assignedPartitions);
    48                                     this.paused = true;
    49                                     this.unsent = records;
    50                                 }
    51                             }
    52                         }
    53                     }
    54                     else {
    55                         if (this.containerProperties.getIdleEventInterval() != null) {
    56                             long now = System.currentTimeMillis();
    57                             if (now > lastReceive + this.containerProperties.getIdleEventInterval()
    58                                     && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
    59                                 publishIdleContainerEvent(now - lastReceive);
    60                                 lastAlertAt = now;
    61                                 if (this.genericListener instanceof ConsumerSeekAware) {
    62                                     seekPartitions(getAssignedPartitions(), true);
    63                                 }
    64                             }
    65                         }
    66                     }
    67                     this.unsent = checkPause(this.unsent);
    68                 }
    69                 catch (WakeupException e) {
    70                     this.unsent = checkPause(this.unsent);
    71                 }
    72                 catch (Exception e) {
    73                     if (this.containerProperties.getGenericErrorHandler() != null) {
    74                         this.containerProperties.getGenericErrorHandler().handle(e, null);
    75                     }
    76                     else {
    77                         this.logger.error("Container exception", e);
    78                     }
    79                 }
    80             }
    81             if (this.listenerInvokerFuture != null) {
    82                 stopInvoker();
    83                 commitManualAcks();
    84             }
    85             try {
    86                 this.consumer.unsubscribe();
    87             }
    88             catch (WakeupException e) {
    89                 // No-op. Continue process
    90             }
    91             this.consumer.close();
    92             if (this.logger.isInfoEnabled()) {
    93                 this.logger.info("Consumer stopped");
    94             }
    95         }

    1.如果用户自定义了分区且非自动提交,那么开启异步线程执行ListenerInvoker任务,源码如下:

    1 private void startInvoker() {
    2             ListenerConsumer.this.invoker = new ListenerInvoker();
    3             ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
    4                     .submit(ListenerConsumer.this.invoker);
    5         }

    执行ListenerInvoker的run方法,实际上就执行一遍,因为CountDownLatch初始化为1

     1 private final class ListenerInvoker implements SchedulingAwareRunnable {
     2 
     3             private final CountDownLatch exitLatch = new CountDownLatch(1);
     4 
     5             private volatile boolean active = true;
     6 
     7             private volatile Thread executingThread;
     8 
     9             ListenerInvoker() {
    10                 super();
    11             }
    12 
    13             @Override
    14             public void run() {
    15                 Assert.isTrue(this.active, "This instance is not active anymore");
    16                 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
    17                     ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
    18                 }
    19                 try {
    20                     this.executingThread = Thread.currentThread();
    21                     while (this.active) {
    22                         try {// 从阻塞队列LinkedBlockingQueue recordsToProcess中拉取 待消费记录
    23                             ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
    24                                     TimeUnit.SECONDS);
    25                             if (this.active) {
    26                                 if (records != null) {
    27                                     invokeListener(records);// 消费
    28                                 }
    29                                 else {
    30                                     if (ListenerConsumer.this.logger.isTraceEnabled()) {
    31                                         ListenerConsumer.this.logger.trace("No records to process");
    32                                     }
    33                                 }
    34                             }
    35                         }
    36                         catch (InterruptedException e) {
    37                             if (!this.active) {
    38                                 Thread.currentThread().interrupt();
    39                             }
    40                             else {
    41                                 ListenerConsumer.this.logger.debug("Interrupt ignored");
    42                             }
    43                         }
    44                     }
    45                 }
    46                 finally {
    47                     this.active = false;
    48                     this.exitLatch.countDown();
    49                 }
    50             }
    51 
    52             @Override
    53             public boolean isLongLived() {
    54                 return true;
    55             }
    581         }
    1 private void invokeListener(final ConsumerRecords<K, V> records) {
    2             if (this.isBatchListener) {
    3                 invokeBatchListener(records);
    4             }
    5             else {
    6                 invokeRecordListener(records);
    7             }
    8         }

    如上图,从阻塞队列中取得待消费记录,用迭代器iterator消费,根据自定义消费类型,用不同listener来执行onMessage方法(用户自定义MessageListener接口的onMessage方法,实现用户自己的消费业务逻辑

     1 private void invokeRecordListener(final ConsumerRecords<K, V> records) {
     2             Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
     3             while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
     4                 final ConsumerRecord<K, V> record = iterator.next();
     5                 if (this.logger.isTraceEnabled()) {
     6                     this.logger.trace("Processing " + record);
     7                 }
     8                 try {
     9                     if (this.acknowledgingMessageListener != null) {
    10                         this.acknowledgingMessageListener.onMessage(record,// 终极核心方法,用户自定义的MessageListener接口的onMessage方法
    11                                 this.isAnyManualAck
    12                                         ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
    13                                         : null);
    14                     }
    15                     else {
    16                         this.listener.onMessage(record);// 终极核心方法,用户自定义的MessageListener接口的onMessage方法
    17                     }
    18                     if (!this.isAnyManualAck && !this.autoCommit) {
    19                         this.acks.add(record);
    20                     }
    21                 }
    22                 catch (Exception e) {
    23                     if (this.containerProperties.isAckOnError() && !this.autoCommit) {
    24                         this.acks.add(record);
    25                     }
    26                     try {
    27                         this.errorHandler.handle(e, record);
    28                     }
    29                     catch (Exception ee) {
    30                         this.logger.error("Error handler threw an exception", ee);
    31                     }
    32                     catch (Error er) { //NOSONAR
    33                         this.logger.error("Error handler threw an error", er);
    34                         throw er;
    35                     }
    36                 }
    37             }
    38         }

    2.未指定分区,进入自旋

    // 1)拉取消费记录
    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
     2)如果设置了自动提交,直接在当前线程执行
    invokeListener(records);
    // 3)否则发送消息进缓存队列
    sendToListener(records)

    1)在每个轮询中,消费者将尝试使用最后一个被使用的偏移量作为起始偏移量,并按顺序提取。最后一个被消费的偏移量可以通过 seek(TopicPartition,long)或自动设置为最后一个被订阅的分区列表的偏移量获得。

     1 @Override
     2     public ConsumerRecords<K, V> poll(long timeout) {
     3         acquire();
     4         try {
     5             if (timeout < 0)
     6                 throw new IllegalArgumentException("Timeout must not be negative");
     7 
     8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
     9                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
    10 
    11             // poll for new data until the timeout expires
    12             long start = time.milliseconds();
    13             long remaining = timeout;
    14             do {
    15                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
    16                 if (!records.isEmpty()) {
    23                     fetcher.sendFetches();// 在返回所获取的记录之前,我们可以发送下一轮的fetches并避免阻塞等待它们的响应,以便在用户处理获取的记录时进行流水线操作。
    24                     client.pollNoWakeup();//由于已经更新了所使用的位置,所以我们不允许在返回所获取的记录之前触发wakeups或任何其他错误。
    25 
    26                     if (this.interceptors == null)
    27                         return new ConsumerRecords<>(records);
    28                     else// 如果存在消费者拦截器执行拦截
    29                         return this.interceptors.onConsume(new ConsumerRecords<>(records));
    30                 }
    31 
    32                 long elapsed = time.milliseconds() - start;
    33                 remaining = timeout - elapsed;
    34             } while (remaining > 0);
    35 
    36             return ConsumerRecords.empty();
    37         } finally {
    38             release();
    39         }
    40     }

    pollOnce:

     1 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
     2         coordinator.poll(time.milliseconds());
     3 
     4         // 遍历所有的TopicPartition,如果有未知偏移量(分区的),那么更新。涉及coordinator刷新已提交分区偏移量+fetcher更新获取位置
     6         if (!subscriptions.hasAllFetchPositions())
     7             updateFetchPositions(this.subscriptions.missingFetchPositions());
     8 
     9         // 返回已获取到的记录
    10         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    11         if (!records.isEmpty())
    12             return records;
    13 
    14         // 发送fetch请求
    15         fetcher.sendFetches();
    16 
    17         long now = time.milliseconds();
    18         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
    19         // 执行IO,拉取数据
    20         client.poll(pollTimeout, now, new PollCondition() {
    21             @Override
    22             public boolean shouldBlock() {
    23                 // since a fetch might be completed by the background thread, we need this poll condition
    24                 // to ensure that we do not block unnecessarily in poll()
    25                 return !fetcher.hasCompletedFetches();
    26             }
    27         });
    31         if (coordinator.needRejoin())
    32             return Collections.emptyMap();
    33 
    34         return fetcher.fetchedRecords();
    35     }

     好吧,再往下涉及到通信IO层了,这里不再多说。将来补全了kafka通信协议相关文章后再加上飞机票。

    2)invokeListener和分支1一样最终调用的是用户自定义的MessageListener接口的onMessage方法,不再重复。

    3) sendToListener,这里塞进缓存队列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞进队列后,何时再消费?ListenerInvoker的run方法执行了recordsToProcess.poll进行了消费,

  • 相关阅读:
    sql 自定义函数-16进制转10进制
    编写一个单独的Web Service for Delphi
    Web Service
    无需WEB服务器的WEBServices
    Svn总是提示输入账号密码
    阿里云服务器SQLSERVER 2019 远程服务器环境搭建
    svn客户端使用
    数据库设计规则(重新整理)
    数据库表字段命名规范
    怎样去掉DELPHI 10.3.3 启动后的 security alert 提示窗体
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/7759876.html
Copyright © 2011-2022 走看看