zoukankan      html  css  js  c++  java
  • 【Java应用】RocketMQ线上问题:重复消费和线程飙升

    问题

    1. RocketMQ重复消费问题
    2. RocketMQ线程过高问题

    线上场景

    场景一:重复消费

    场景:生产有这么一种场景,我们在RocketMQ中对一个topic创建了16个tag,不同总类的信息放到不同的tag中,在消费端每个tag对应三个线程组成group去消费消息。消费服务在线上是集群部署,是使用docker进行部署的。

    问题1:tag中的消息发生了稳定的重复性消费。

    排查:首先我们发现重复消费的次数和线上集群的台数是一致的,所以这个时候就去查看配置信息,然后发现没有配置错误,在多方试错的情况下,最后在rocketmq的监控页面发现ClientId获取的IP竟然是一样的。
    clientID

    这时候阅读RocketMQ的源码,我们在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl的start方法中看到下面这行代码

    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    

    点进去看到getAndCreateMQClientInstance

    buildMQClientId

    从上面图中的代码以及我们看到的RocketMQ的监控图可以明白一点,rocketmq在docker部署中通过getLocalAddress方法获取出来的IP是一样,如果你不设置instanceName和unitName,那么多台机器上面使用的就是一个instance。这样可能会造成重复消费,那么为什么instanceName一致就会造成重复消费呢?接着往下看

    1. org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

      start方法

      // 一个JVM中的所有消费组、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次
      mQClientFactory.start();
      
    2. org.apache.rocketmq.client.impl.factory.MQClientInstance

      public void start() throws MQClientException {
      	...
      	this.rebalanceService.start();
      	...
      }
      
    3. org.apache.rocketmq.client.impl.consumer.RebalanceService

      @Override
      public void run() {
          log.info(this.getServiceName() + " service started");
      
          while (!this.isStopped()) {
              // 该线程默认20s执行一次rebalance
              this.waitForRunning(waitInterval);
              this.mqClientFactory.doRebalance();
          }
      
          log.info(this.getServiceName() + " service end");
      }
      
    4. org.apache.rocketmq.client.impl.factory.MQClientInstance

      public void doRebalance() {
          // 遍历注册的所有已经注册的消费者,对消费者执行rebalance
          for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
              MQConsumerInner impl = entry.getValue();
              if (impl != null) {
                  try {
                      impl.doRebalance();
                  } catch (Throwable e) {
                      log.error("doRebalance exception", e);
                  }
              }
          }
      }
      
    5. org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

      @Override
      public void doRebalance() {
          if (!this.pause) {
              // 每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象
              this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
          }
      }
      
    6. org.apache.rocketmq.client.impl.consumer.RebalanceImpl

      public void doRebalance(final boolean isOrder) {
          Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
          if (subTable != null) {
              // 遍历订阅信息对每个主题的队列进行重新负载
              for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                  final String topic = entry.getKey();
                  try {
                      this.rebalanceByTopic(topic, isOrder);
                  } catch (Throwable e) {
                      if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                          log.warn("rebalanceByTopic Exception", e);
                      }
                  }
              }
          }
      
          this.truncateMessageQueueNotMyTopic();
      }
      
    7. org.apache.rocketmq.client.impl.consumer.RebalanceImpl

      private void rebalanceByTopic(final String topic, final boolean isOrder) {
          switch (messageModel) {
              case BROADCASTING: {
                  ...
              }
              case CLUSTERING: {
                  // 从主题订阅信息缓存表中获取该topic的队列信息
                  Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                  // topic分布在多个broker上,但是每个broker都存有所有的消费者信息,因为消费者启动的时候需要像所有的broker注册信息
                  // 这里获取的是当前topic下消费者组里所有的消费者客户端ID
                  List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      
                  // 对cidAll和mqAll排序,确保所有消费者结果一致,这样一个消费队列就只能被一个消费者分配
                  Collections.sort(mqAll);
                  Collections.sort(cidAll);
                  // 默认为AllocateMessageQueueAveragely
                  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                  allocateResult = strategy.allocate(//
                          this.consumerGroup, //
                          this.mQClientFactory.getClientId(), //
                          mqAll, //
                          cidAll);
              }
          }
      }
      

    我们知道RocketMQ不管push还是pull其实底层的实现都是pull,我们看到最后发现他会根据topic和group从broker那里获取出来所有cunsumer client,如果clientId相同,那么在broker上面只有一个,获取出来的是一样,那么拉取的MessageQueue就是一样的。于是我们就可以给consumer的instanceName设置一个随机值

    场景二:线程剧增

    问题2: 设置完随机值以后确实不重复消费了,但是发现服务器的线程飙升。

    排查:jstack下来线上日志,发现里面有很多netty以及rocketmq相关的线程,于是我们再次进到源码中。这里我就不详细跟踪代码了

    start

    我们从这里可以看到consumer端起了很多线程,报错与broker建立链接的线程,这里面会级联产生多个netty相关的线程,然后是定时任务的线程,以及拉取消息的线程和负载均衡的线程。于是我们把instanceName的随机性与服务绑定,而不是与tag绑定,这样就可以做到一台服务器以他instance

    结论

    对于同一个jvm实例我们只需要一个instance实例即可,对于多个jvm我们要区分,不然集群消费会隐式的变为广播消费

    参考

    五种队列分配策略

  • 相关阅读:
    Towards Life-Long Autonomy of Mobile Robots Through Feature-Based Change Detection
    Magnetic field constraints and sequence-based matching for indoor pose graph SLAM
    3D Image-based Indoor Localization Joint With WiFi Positioning
    Online Probabilistic Change Detection in Feature-Based Maps
    Detectron2--(1)
    detectron2 + ubuntu + cpu
    出去上网-ubuntu-ss
    深度学习入门4
    IDM添加代理
    [转载]clover引导黑苹果icloud已达到账户数量限制解决方法
  • 原文地址:https://www.cnblogs.com/colin-xun/p/13740624.html
Copyright © 2011-2022 走看看