zoukankan      html  css  js  c++  java
  • spring kafka consumer原理解析二

    前言:

     在 spring kafka consumer原理解析一 里谈到了spring kafka 容器的加载过程,即每个 @KafkaListenner 会对应加载一个 CurrentMessageListennerContainer(一个多线程 kafka容器),而在 CurrentMessageListennerContainer 里面其实是创建了多个KafkaMessageListennerContainer (一个单线程 kafka 容器),到底创建几个是有 currency 这个参数指定。最终是在 kafkaMessageListennerContainer 创建 ListenerConsumer (一个线程类)调用 KafkaConsumer API 进行 kafka 的操作。然而通过调用 Kafka API 是如何进行操作的呢? see ↓

    解析:

    (1)ListenerConsumer (这个线程类是 Spring kafka 最终实现消费者代码的核心所在)

         private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
         @Override
    public void run() { if (this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.theListener).registerSeekCallback(this); } this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) {
              // 初始化分区:也就是通过 seekToEnd(kafka consumer API)获取最新的偏移量,然后将其保存起来 initPartitionsIfNeeded();
    // we start the invoker here as there will be no rebalance calls to // trigger it, but only if the container is not set to autocommit // otherwise we will process records on a separate thread
              // 判断是否是自动提交,如果不是自动提交则新开一个线程将数据(records)作为参数推至带有 @KafkaListener 方法中,然后手动提交为什么要新开一个线程呢?继续往下探索 ↓ if (!this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; while (isRunning()) { try {
                // 如果是手动提交则判断现在是否要进行 commit
    if (!this.autoCommit) { processCommits(); }
                // 获取最新的偏移量 processSeeks();
    if (this.logger.isTraceEnabled()) { this.logger.trace("Polling (paused=" + this.paused + ")..."); }
                // 拉取数据 ConsumerRecords
    <K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); } if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { lastReceive = System.currentTimeMillis(); } // if the container is set to auto-commit, then execute in the // same thread // otherwise send to the buffering queue
                   // 如果是自动提交则直接在这里将数据推送至带有 @KafkaListener 的方法中,相比手动提交不同的是:手动提交时新开了一个线程处理,而自动提交是在主线程执行这段逻辑。
                   // 为什么呢?这里说得很明白,因为设置为手动提交的时候,spring kafka 会把拉取下来的数据线放入一个队列中缓存起来,而推送线程是每次从那个缓存队列中获取数据推送过去,
                   // 而自动提交则是每次获取一点数据就直接推送过去 if (this.autoCommit) { invokeListener(records); } else {
                     // 这里是将数据存至缓存队列,如果缓存队列满了,则会调用 pause 暂停分区,即会停止拉取数据,直至缓存队列达到未满状态
    if (sendToListener(records)) { if (this.assignedPartitions != null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused = true; this.unsent = records; } } } } else { if (this.containerProperties.getIdleEventInterval() != null) { long now = System.currentTimeMillis(); if (now > lastReceive + this.containerProperties.getIdleEventInterval() && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { publishIdleContainerEvent(now - lastReceive); lastAlertAt = now; if (this.theListener instanceof ConsumerSeekAware) { seekPartitions(getAssignedPartitions(), true); } } } } this.unsent = checkPause(this.unsent); } catch (WakeupException e) { this.unsent = checkPause(this.unsent); } catch (Exception e) { if (this.containerProperties.getGenericErrorHandler() != null) { this.containerProperties.getGenericErrorHandler().handle(e, null); } else { this.logger.error("Container exception", e); } } } if (this.listenerInvokerFuture != null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close(); if (this.logger.isInfoEnabled()) { this.logger.info("Consumer stopped"); } } }
  • 相关阅读:
    基于微信红包插件的原理实现android任何APP自动发送评论(已开源)
    人家为撩妹就鼓捣个网页,我做了个约炮APP(已开源)
    android加固签名工具(源码下载)
    如何优雅的写一篇安利文-以Sugar ORM为例
    写给独立开发兄弟共勉-寂寞是19首诗和2首悲歌
    我开源了一个ios应用,你们拿去随便玩
    android用欢迎界面加载运行环境
    用c#操作Mongodb(附demo)
    sql:除非另外还指定了 TOP 或 FOR XML,否则,ORDER BY 子句在视图、内联函数、派生表、子查询
    怎样阻止Linux服务器执行rm -rf /*命令
  • 原文地址:https://www.cnblogs.com/lzj123/p/10749377.html
Copyright © 2011-2022 走看看