zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——生产者启动流程

      RocketMQ(4.8.0)——生产者启动流程

    DefaultMQProducer(D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientproducerDefaultMQProducer.java )是RocketMQ中默认的生产者实现,DefaultMQProducer 的类之间的继承关系如图:

    从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer

    • DefaultMQProducer: 我们常用的生产者。
    • TransactionMQProducer:继承自 DefaultMQProducer,并支持事务消息。

    以下是一些核心属性:

      namesrvAddr:继承自 Client,表示 RocketMQ集群的Namesrv地址,如果是多个则用分号分开。比如:127.0.0.1:9876。

      clientIP:使用客户端程序所在机器的IP地址。支持IPv4 和 IPv6,IPv4排除了本地的环回地址(127.0.x.x) 和私有内网地址。这里需要注意的是如果Client运行在Docker容器中,获取的IP地址是容器所在的IP地址,而非宿主机的IP地址。

      instanceName:实例名,每个实例都需要取唯一的名字,因为有时我们会在同一个机器上部署多个程序进程,如果名字有重复就会导致启动失败。

      vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。

      clientChannelEnabled:客户端回调线程数。该参数表示Netty通信层回调线程的个数 ,默认值availableProcessors()表示当前 CPU 的有效个数。

      pollNameServerInterval:获取Topic路由信息的间隔市场,单位为ms,默认为30s。

      hearbeatBrokerInterval:与Broker心跳间隔的时长,单位为ms,默认为30s。

      defaultMQProducerImpl:默认生产者的实现类,其中封装了Broker的各种API(自动及关闭生产者的接口)。如果你想要自己实现一个生产者,可以添加一个新的实现,保持DefaultMQProducer对外接口不变,用户完全没有感知。

      producerGroup:生产者组名,这是一个必须传递的参数。RocketMQ-way表示同一个生产者组中的生产者实例行为需要一致。

      sendMsgTimeout:发送超时时间,单位为ms。

      compressMsgBodyOverHowmuch:消息体的容量上限,超过该上限时消息体会通过ZIP进行压缩,该值默认为4MB。

      retryTimesWhenSendFailed:同步发送失败后重试的次数。默认为2次,也就是说一共3次发送机会。

      retryTimesWhenSendAsyncFailed:异步发送失败后重试的次数。默认为2次。异步重试是有条件的重试,并不是每次发送失败后都重试,比如:responseFuture.isSendRequestOK()、responseFuture.isTimeout() 等等。

     1     private void sendMessageAsync(
     2         final String addr,
     3         final String brokerName,
     4         final Message msg,
     5         final long timeoutMillis,
     6         final RemotingCommand request,
     7         final SendCallback sendCallback,
     8         final TopicPublishInfo topicPublishInfo,
     9         final MQClientInstance instance,
    10         final int retryTimesWhenSendFailed,
    11         final AtomicInteger times,
    12         final SendMessageContext context,
    13         final DefaultMQProducerImpl producer
    14     ) throws InterruptedException, RemotingException {
    15         final long beginStartTime = System.currentTimeMillis();
    16         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
    17             @Override
    18             public void operationComplete(ResponseFuture responseFuture) {
    19                 long cost = System.currentTimeMillis() - beginStartTime;
    20                 RemotingCommand response = responseFuture.getResponseCommand();
    21                 if (null == sendCallback && response != null) {
    22 
    23                     try {
    24                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
    25                         if (context != null && sendResult != null) {
    26                             context.setSendResult(sendResult);
    27                             context.getProducer().executeSendMessageHookAfter(context);
    28                         }
    29                     } catch (Throwable e) {
    30                     }
    31 
    32                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
    33                     return;
    34                 }
    35 
    36                 if (response != null) {
    37                     try {
    38                         SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
    39                         assert sendResult != null;
    40                         if (context != null) {
    41                             context.setSendResult(sendResult);
    42                             context.getProducer().executeSendMessageHookAfter(context);
    43                         }
    44 
    45                         try {
    46                             sendCallback.onSuccess(sendResult);
    47                         } catch (Throwable e) {
    48                         }
    49 
    50                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
    51                     } catch (Exception e) {
    52                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
    53                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
    54                             retryTimesWhenSendFailed, times, e, context, false, producer);
    55                     }
    56                 } else {
    57                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
    58                     if (!responseFuture.isSendRequestOK()) {
    59                         MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
    60                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
    61                             retryTimesWhenSendFailed, times, ex, context, true, producer);
    62                     } else if (responseFuture.isTimeout()) {
    63                         MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
    64                             responseFuture.getCause());
    65                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
    66                             retryTimesWhenSendFailed, times, ex, context, true, producer);
    67                     } else {
    68                         MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
    69                         onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
    70                             retryTimesWhenSendFailed, times, ex, context, true, producer);
    71                     }
    72                 }
    73             }
    74         });
    75     }
    retryTimesWhenSendAsyncFailed

    以下是一些核心方法:

      start():这是启动整个生产者实例的入口,主要负责校验产生者的配置参数的是否正确,并启动通信通道、各种定时计划任务、Pull服务、Rebalance服务、注册生产者到Broker等操作。

      shudown():关闭本地已注册的生产者,关闭已注册到Broker的客户端。

      fetchPublishMessageQueues(Topic):获取一个Topic有哪些Queue。在发送消息、Pull消息时都需要调用。

      send(Message msg):同步发送普通消息。

      send(Message msg, long timeout):同步发送普通消息(超时设置)。

      send(Message msg, SendCallback sendCallback):异步发送普通消息。

      send(Message msg, SendCallback sendCallback, long timeout):异步发送消息,并指定回调方法和超时时间。

      sendOneway(Message msg):发送单向消息。只负责发送消息,不管发送结果。

      send(Message msg, MessageQueue mq):同步向指定队列发送消息。
      send(Message msg, MessageQueue mq, long timeout):同步向指定队列发送消息(超时设置)。

      send(Message msg, MessageQueue mq, SendCallback sendCallback):异步发送消息到指定队列。

      send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout):异步发送消息到指定队列(超时设置)。

      send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback):自定义消息发送到指定队列。通过实现MessageQueueSelector接口来选择将发消息发送到哪个队列。

      send(Collection<Message> msgs):批量发送消息。

    下面介绍两个核心管理接口:

      createTopic(String key, String newTopic, int queueNum):在broker上创建指定的Topic。

      viewMessage(String offsetMsgId):根据消息id查询消息内容。

       生产者启动的流程比消费者启动的流程更为简单,一般用户使用 DefaultMQProducer的构造函数构造一个生产者实例,并设置各种参数。比如Namesrv地址、生产者组名等,调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动。

      MQClientInstance 实例与clientId是一一对应的,而clientId是由clientIP、instanceName及unitName构成的。一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,就会只创建一个MQClientInstance实例,具体实现代码如下:

     1     public String buildMQClientId() {
     2         StringBuilder sb = new StringBuilder();
     3         sb.append(this.getClientIP());
     4 
     5         sb.append("@");
     6         sb.append(this.getInstanceName());
     7         if (!UtilAll.isBlank(this.unitName)) {
     8             sb.append("@");
     9             sb.append(this.unitName);
    10         }
    11 
    12         return sb.toString();
    13     }

      MQClientInstance 实例的功能是管理本实例中全部生产者与消费者的生产和消费行为,代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplfactoryMQClientInstance.java,相关如下:

       1 public class MQClientInstance {
       2     private final static long LOCK_TIMEOUT_MILLIS = 3000;
       3     private final InternalLogger log = ClientLogger.getLog();
       4     private final ClientConfig clientConfig;
       5     private final int instanceIndex;
       6     private final String clientId;
       7     private final long bootTimestamp = System.currentTimeMillis();
       8     private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
       9     private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
      10     private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
      11     private final NettyClientConfig nettyClientConfig;
      12     private final MQClientAPIImpl mQClientAPIImpl;
      13     private final MQAdminImpl mQAdminImpl;
      14     private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
      15     private final Lock lockNamesrv = new ReentrantLock();
      16     private final Lock lockHeartbeat = new ReentrantLock();
      17     private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
      18         new ConcurrentHashMap<String, HashMap<Long, String>>();
      19     private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
      20         new ConcurrentHashMap<String, HashMap<String, Integer>>();
      21     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
      22         @Override
      23         public Thread newThread(Runnable r) {
      24             return new Thread(r, "MQClientFactoryScheduledThread");
      25         }
      26     });
      27     private final ClientRemotingProcessor clientRemotingProcessor;
      28     private final PullMessageService pullMessageService;
      29     private final RebalanceService rebalanceService;
      30     private final DefaultMQProducer defaultMQProducer;
      31     private final ConsumerStatsManager consumerStatsManager;
      32     private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
      33     private ServiceState serviceState = ServiceState.CREATE_JUST;
      34     private Random random = new Random();
      35 
      36     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
      37         this(clientConfig, instanceIndex, clientId, null);
      38     }
      39 
      40     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
      41         this.clientConfig = clientConfig;
      42         this.instanceIndex = instanceIndex;
      43         this.nettyClientConfig = new NettyClientConfig();
      44         this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
      45         this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
      46         this.clientRemotingProcessor = new ClientRemotingProcessor(this);
      47         this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
      48 
      49         if (this.clientConfig.getNamesrvAddr() != null) {
      50             this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
      51             log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
      52         }
      53 
      54         this.clientId = clientId;
      55 
      56         this.mQAdminImpl = new MQAdminImpl(this);
      57 
      58         this.pullMessageService = new PullMessageService(this);
      59 
      60         this.rebalanceService = new RebalanceService(this);
      61 
      62         this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
      63         this.defaultMQProducer.resetClientConfig(clientConfig);
      64 
      65         this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
      66 
      67         log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
      68             this.instanceIndex,
      69             this.clientId,
      70             this.clientConfig,
      71             MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
      72     }
      73 
      74     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
      75         TopicPublishInfo info = new TopicPublishInfo();
      76         info.setTopicRouteData(route);
      77         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
      78             String[] brokers = route.getOrderTopicConf().split(";");
      79             for (String broker : brokers) {
      80                 String[] item = broker.split(":");
      81                 int nums = Integer.parseInt(item[1]);
      82                 for (int i = 0; i < nums; i++) {
      83                     MessageQueue mq = new MessageQueue(topic, item[0], i);
      84                     info.getMessageQueueList().add(mq);
      85                 }
      86             }
      87 
      88             info.setOrderTopic(true);
      89         } else {
      90             List<QueueData> qds = route.getQueueDatas();
      91             Collections.sort(qds);
      92             for (QueueData qd : qds) {
      93                 if (PermName.isWriteable(qd.getPerm())) {
      94                     BrokerData brokerData = null;
      95                     for (BrokerData bd : route.getBrokerDatas()) {
      96                         if (bd.getBrokerName().equals(qd.getBrokerName())) {
      97                             brokerData = bd;
      98                             break;
      99                         }
     100                     }
     101 
     102                     if (null == brokerData) {
     103                         continue;
     104                     }
     105 
     106                     if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
     107                         continue;
     108                     }
     109 
     110                     for (int i = 0; i < qd.getWriteQueueNums(); i++) {
     111                         MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
     112                         info.getMessageQueueList().add(mq);
     113                     }
     114                 }
     115             }
     116 
     117             info.setOrderTopic(false);
     118         }
     119 
     120         return info;
     121     }
     122 
     123     public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
     124         Set<MessageQueue> mqList = new HashSet<MessageQueue>();
     125         List<QueueData> qds = route.getQueueDatas();
     126         for (QueueData qd : qds) {
     127             if (PermName.isReadable(qd.getPerm())) {
     128                 for (int i = 0; i < qd.getReadQueueNums(); i++) {
     129                     MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
     130                     mqList.add(mq);
     131                 }
     132             }
     133         }
     134 
     135         return mqList;
     136     }
     137 
     138     public void start() throws MQClientException {
     139 
     140         synchronized (this) {
     141             switch (this.serviceState) {
     142                 case CREATE_JUST:
     143                     this.serviceState = ServiceState.START_FAILED;
     144                     // If not specified,looking address from name server
     145                     if (null == this.clientConfig.getNamesrvAddr()) {
     146                         this.mQClientAPIImpl.fetchNameServerAddr();
     147                     }
     148                     // Start request-response channel
     149                     this.mQClientAPIImpl.start();
     150                     // Start various schedule tasks
     151                     this.startScheduledTask();
     152                     // Start pull service
     153                     this.pullMessageService.start();
     154                     // Start rebalance service
     155                     this.rebalanceService.start();
     156                     // Start push service
     157                     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
     158                     log.info("the client factory [{}] start OK", this.clientId);
     159                     this.serviceState = ServiceState.RUNNING;
     160                     break;
     161                 case START_FAILED:
     162                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
     163                 default:
     164                     break;
     165             }
     166         }
     167     }
     168 
     169     private void startScheduledTask() {
     170         if (null == this.clientConfig.getNamesrvAddr()) {
     171             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     172 
     173                 @Override
     174                 public void run() {
     175                     try {
     176                         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
     177                     } catch (Exception e) {
     178                         log.error("ScheduledTask fetchNameServerAddr exception", e);
     179                     }
     180                 }
     181             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
     182         }
     183 
     184         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     185 
     186             @Override
     187             public void run() {
     188                 try {
     189                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
     190                 } catch (Exception e) {
     191                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
     192                 }
     193             }
     194         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
     195 
     196         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     197 
     198             @Override
     199             public void run() {
     200                 try {
     201                     MQClientInstance.this.cleanOfflineBroker();
     202                     MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
     203                 } catch (Exception e) {
     204                     log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
     205                 }
     206             }
     207         }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
     208 
     209         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     210 
     211             @Override
     212             public void run() {
     213                 try {
     214                     MQClientInstance.this.persistAllConsumerOffset();
     215                 } catch (Exception e) {
     216                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
     217                 }
     218             }
     219         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
     220 
     221         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     222 
     223             @Override
     224             public void run() {
     225                 try {
     226                     MQClientInstance.this.adjustThreadPool();
     227                 } catch (Exception e) {
     228                     log.error("ScheduledTask adjustThreadPool exception", e);
     229                 }
     230             }
     231         }, 1, 1, TimeUnit.MINUTES);
     232     }
     233 
     234     public String getClientId() {
     235         return clientId;
     236     }
     237 
     238     public void updateTopicRouteInfoFromNameServer() {
     239         Set<String> topicList = new HashSet<String>();
     240 
     241         // Consumer
     242         {
     243             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     244             while (it.hasNext()) {
     245                 Entry<String, MQConsumerInner> entry = it.next();
     246                 MQConsumerInner impl = entry.getValue();
     247                 if (impl != null) {
     248                     Set<SubscriptionData> subList = impl.subscriptions();
     249                     if (subList != null) {
     250                         for (SubscriptionData subData : subList) {
     251                             topicList.add(subData.getTopic());
     252                         }
     253                     }
     254                 }
     255             }
     256         }
     257 
     258         // Producer
     259         {
     260             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
     261             while (it.hasNext()) {
     262                 Entry<String, MQProducerInner> entry = it.next();
     263                 MQProducerInner impl = entry.getValue();
     264                 if (impl != null) {
     265                     Set<String> lst = impl.getPublishTopicList();
     266                     topicList.addAll(lst);
     267                 }
     268             }
     269         }
     270 
     271         for (String topic : topicList) {
     272             this.updateTopicRouteInfoFromNameServer(topic);
     273         }
     274     }
     275 
     276     /**
     277      * @param offsetTable
     278      * @param namespace
     279      * @return newOffsetTable
     280      */
     281     public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
     282         HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
     283         if (StringUtils.isNotEmpty(namespace)) {
     284             for (Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
     285                 MessageQueue queue = entry.getKey();
     286                 queue.setTopic(NamespaceUtil.withoutNamespace(queue.getTopic(), namespace));
     287                 newOffsetTable.put(queue, entry.getValue());
     288             }
     289         } else {
     290             newOffsetTable.putAll(offsetTable);
     291         }
     292 
     293         return newOffsetTable;
     294     }
     295 
     296     /**
     297      * Remove offline broker
     298      */
     299     private void cleanOfflineBroker() {
     300         try {
     301             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
     302                 try {
     303                     ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
     304 
     305                     Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
     306                     while (itBrokerTable.hasNext()) {
     307                         Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
     308                         String brokerName = entry.getKey();
     309                         HashMap<Long, String> oneTable = entry.getValue();
     310 
     311                         HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
     312                         cloneAddrTable.putAll(oneTable);
     313 
     314                         Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
     315                         while (it.hasNext()) {
     316                             Entry<Long, String> ee = it.next();
     317                             String addr = ee.getValue();
     318                             if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
     319                                 it.remove();
     320                                 log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
     321                             }
     322                         }
     323 
     324                         if (cloneAddrTable.isEmpty()) {
     325                             itBrokerTable.remove();
     326                             log.info("the broker[{}] name's host is offline, remove it", brokerName);
     327                         } else {
     328                             updatedTable.put(brokerName, cloneAddrTable);
     329                         }
     330                     }
     331 
     332                     if (!updatedTable.isEmpty()) {
     333                         this.brokerAddrTable.putAll(updatedTable);
     334                     }
     335                 } finally {
     336                     this.lockNamesrv.unlock();
     337                 }
     338         } catch (InterruptedException e) {
     339             log.warn("cleanOfflineBroker Exception", e);
     340         }
     341     }
     342 
     343     public void checkClientInBroker() throws MQClientException {
     344         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     345 
     346         while (it.hasNext()) {
     347             Entry<String, MQConsumerInner> entry = it.next();
     348             Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
     349             if (subscriptionInner == null || subscriptionInner.isEmpty()) {
     350                 return;
     351             }
     352 
     353             for (SubscriptionData subscriptionData : subscriptionInner) {
     354                 if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
     355                     continue;
     356                 }
     357                 // may need to check one broker every cluster...
     358                 // assume that the configs of every broker in cluster are the the same.
     359                 String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
     360 
     361                 if (addr != null) {
     362                     try {
     363                         this.getMQClientAPIImpl().checkClientInBroker(
     364                             addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
     365                         );
     366                     } catch (Exception e) {
     367                         if (e instanceof MQClientException) {
     368                             throw (MQClientException) e;
     369                         } else {
     370                             throw new MQClientException("Check client in broker error, maybe because you use "
     371                                 + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
     372                                 + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
     373                                 "have use the new features which are not supported by server, please check the log!", e);
     374                         }
     375                     }
     376                 }
     377             }
     378         }
     379     }
     380 
     381     public void sendHeartbeatToAllBrokerWithLock() {
     382         if (this.lockHeartbeat.tryLock()) {
     383             try {
     384                 this.sendHeartbeatToAllBroker();
     385                 this.uploadFilterClassSource();
     386             } catch (final Exception e) {
     387                 log.error("sendHeartbeatToAllBroker exception", e);
     388             } finally {
     389                 this.lockHeartbeat.unlock();
     390             }
     391         } else {
     392             log.warn("lock heartBeat, but failed. [{}]", this.clientId);
     393         }
     394     }
     395 
     396     private void persistAllConsumerOffset() {
     397         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     398         while (it.hasNext()) {
     399             Entry<String, MQConsumerInner> entry = it.next();
     400             MQConsumerInner impl = entry.getValue();
     401             impl.persistConsumerOffset();
     402         }
     403     }
     404 
     405     public void adjustThreadPool() {
     406         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     407         while (it.hasNext()) {
     408             Entry<String, MQConsumerInner> entry = it.next();
     409             MQConsumerInner impl = entry.getValue();
     410             if (impl != null) {
     411                 try {
     412                     if (impl instanceof DefaultMQPushConsumerImpl) {
     413                         DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
     414                         dmq.adjustThreadPool();
     415                     }
     416                 } catch (Exception e) {
     417                 }
     418             }
     419         }
     420     }
     421 
     422     public boolean updateTopicRouteInfoFromNameServer(final String topic) {
     423         return updateTopicRouteInfoFromNameServer(topic, false, null);
     424     }
     425 
     426     private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
     427         Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
     428         while (it.hasNext()) {
     429             Entry<String, TopicRouteData> entry = it.next();
     430             TopicRouteData topicRouteData = entry.getValue();
     431             List<BrokerData> bds = topicRouteData.getBrokerDatas();
     432             for (BrokerData bd : bds) {
     433                 if (bd.getBrokerAddrs() != null) {
     434                     boolean exist = bd.getBrokerAddrs().containsValue(addr);
     435                     if (exist)
     436                         return true;
     437                 }
     438             }
     439         }
     440 
     441         return false;
     442     }
     443 
     444     private void sendHeartbeatToAllBroker() {
     445         final HeartbeatData heartbeatData = this.prepareHeartbeatData();
     446         final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
     447         final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
     448         if (producerEmpty && consumerEmpty) {
     449             log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
     450             return;
     451         }
     452 
     453         if (!this.brokerAddrTable.isEmpty()) {
     454             long times = this.sendHeartbeatTimesTotal.getAndIncrement();
     455             Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
     456             while (it.hasNext()) {
     457                 Entry<String, HashMap<Long, String>> entry = it.next();
     458                 String brokerName = entry.getKey();
     459                 HashMap<Long, String> oneTable = entry.getValue();
     460                 if (oneTable != null) {
     461                     for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
     462                         Long id = entry1.getKey();
     463                         String addr = entry1.getValue();
     464                         if (addr != null) {
     465                             if (consumerEmpty) {
     466                                 if (id != MixAll.MASTER_ID)
     467                                     continue;
     468                             }
     469 
     470                             try {
     471                                 int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
     472                                 if (!this.brokerVersionTable.containsKey(brokerName)) {
     473                                     this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
     474                                 }
     475                                 this.brokerVersionTable.get(brokerName).put(addr, version);
     476                                 if (times % 20 == 0) {
     477                                     log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
     478                                     log.info(heartbeatData.toString());
     479                                 }
     480                             } catch (Exception e) {
     481                                 if (this.isBrokerInNameServer(addr)) {
     482                                     log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
     483                                 } else {
     484                                     log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
     485                                         id, addr, e);
     486                                 }
     487                             }
     488                         }
     489                     }
     490                 }
     491             }
     492         }
     493     }
     494 
     495     private void uploadFilterClassSource() {
     496         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     497         while (it.hasNext()) {
     498             Entry<String, MQConsumerInner> next = it.next();
     499             MQConsumerInner consumer = next.getValue();
     500             if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
     501                 Set<SubscriptionData> subscriptions = consumer.subscriptions();
     502                 for (SubscriptionData sub : subscriptions) {
     503                     if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
     504                         final String consumerGroup = consumer.groupName();
     505                         final String className = sub.getSubString();
     506                         final String topic = sub.getTopic();
     507                         final String filterClassSource = sub.getFilterClassSource();
     508                         try {
     509                             this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
     510                         } catch (Exception e) {
     511                             log.error("uploadFilterClassToAllFilterServer Exception", e);
     512                         }
     513                     }
     514                 }
     515             }
     516         }
     517     }
     518 
     519     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
     520         DefaultMQProducer defaultMQProducer) {
     521         try {
     522             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
     523                 try {
     524                     TopicRouteData topicRouteData;
     525                     if (isDefault && defaultMQProducer != null) {
     526                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
     527                             1000 * 3);
     528                         if (topicRouteData != null) {
     529                             for (QueueData data : topicRouteData.getQueueDatas()) {
     530                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
     531                                 data.setReadQueueNums(queueNums);
     532                                 data.setWriteQueueNums(queueNums);
     533                             }
     534                         }
     535                     } else {
     536                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
     537                     }
     538                     if (topicRouteData != null) {
     539                         TopicRouteData old = this.topicRouteTable.get(topic);
     540                         boolean changed = topicRouteDataIsChange(old, topicRouteData);
     541                         if (!changed) {
     542                             changed = this.isNeedUpdateTopicRouteInfo(topic);
     543                         } else {
     544                             log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
     545                         }
     546 
     547                         if (changed) {
     548                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
     549 
     550                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
     551                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
     552                             }
     553 
     554                             // Update Pub info
     555                             {
     556                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
     557                                 publishInfo.setHaveTopicRouterInfo(true);
     558                                 Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
     559                                 while (it.hasNext()) {
     560                                     Entry<String, MQProducerInner> entry = it.next();
     561                                     MQProducerInner impl = entry.getValue();
     562                                     if (impl != null) {
     563                                         impl.updateTopicPublishInfo(topic, publishInfo);
     564                                     }
     565                                 }
     566                             }
     567 
     568                             // Update sub info
     569                             {
     570                                 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
     571                                 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     572                                 while (it.hasNext()) {
     573                                     Entry<String, MQConsumerInner> entry = it.next();
     574                                     MQConsumerInner impl = entry.getValue();
     575                                     if (impl != null) {
     576                                         impl.updateTopicSubscribeInfo(topic, subscribeInfo);
     577                                     }
     578                                 }
     579                             }
     580                             log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
     581                             this.topicRouteTable.put(topic, cloneTopicRouteData);
     582                             return true;
     583                         }
     584                     } else {
     585                         log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
     586                     }
     587                 } catch (MQClientException e) {
     588                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
     589                         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
     590                     }
     591                 } catch (RemotingException e) {
     592                     log.error("updateTopicRouteInfoFromNameServer Exception", e);
     593                     throw new IllegalStateException(e);
     594                 } finally {
     595                     this.lockNamesrv.unlock();
     596                 }
     597             } else {
     598                 log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
     599             }
     600         } catch (InterruptedException e) {
     601             log.warn("updateTopicRouteInfoFromNameServer Exception", e);
     602         }
     603 
     604         return false;
     605     }
     606 
     607     private HeartbeatData prepareHeartbeatData() {
     608         HeartbeatData heartbeatData = new HeartbeatData();
     609 
     610         // clientID
     611         heartbeatData.setClientID(this.clientId);
     612 
     613         // Consumer
     614         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
     615             MQConsumerInner impl = entry.getValue();
     616             if (impl != null) {
     617                 ConsumerData consumerData = new ConsumerData();
     618                 consumerData.setGroupName(impl.groupName());
     619                 consumerData.setConsumeType(impl.consumeType());
     620                 consumerData.setMessageModel(impl.messageModel());
     621                 consumerData.setConsumeFromWhere(impl.consumeFromWhere());
     622                 consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
     623                 consumerData.setUnitMode(impl.isUnitMode());
     624 
     625                 heartbeatData.getConsumerDataSet().add(consumerData);
     626             }
     627         }
     628 
     629         // Producer
     630         for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
     631             MQProducerInner impl = entry.getValue();
     632             if (impl != null) {
     633                 ProducerData producerData = new ProducerData();
     634                 producerData.setGroupName(entry.getKey());
     635 
     636                 heartbeatData.getProducerDataSet().add(producerData);
     637             }
     638         }
     639 
     640         return heartbeatData;
     641     }
     642 
     643     private boolean isBrokerInNameServer(final String brokerAddr) {
     644         Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
     645         while (it.hasNext()) {
     646             Entry<String, TopicRouteData> itNext = it.next();
     647             List<BrokerData> brokerDatas = itNext.getValue().getBrokerDatas();
     648             for (BrokerData bd : brokerDatas) {
     649                 boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);
     650                 if (contain)
     651                     return true;
     652             }
     653         }
     654 
     655         return false;
     656     }
     657 
     658     /**
     659      * This method will be removed in the version 5.0.0,because filterServer was removed,and method
     660      * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
     661      */
     662     @Deprecated
     663     private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
     664         final String topic,
     665         final String filterClassSource) throws UnsupportedEncodingException {
     666         byte[] classBody = null;
     667         int classCRC = 0;
     668         try {
     669             classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
     670             classCRC = UtilAll.crc32(classBody);
     671         } catch (Exception e1) {
     672             log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
     673                 fullClassName,
     674                 RemotingHelper.exceptionSimpleDesc(e1));
     675         }
     676 
     677         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
     678         if (topicRouteData != null
     679             && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
     680             Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
     681             while (it.hasNext()) {
     682                 Entry<String, List<String>> next = it.next();
     683                 List<String> value = next.getValue();
     684                 for (final String fsAddr : value) {
     685                     try {
     686                         this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
     687                             5000);
     688 
     689                         log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
     690                             topic, fullClassName);
     691 
     692                     } catch (Exception e) {
     693                         log.error("uploadFilterClassToAllFilterServer Exception", e);
     694                     }
     695                 }
     696             }
     697         } else {
     698             log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
     699                 consumerGroup, topic, fullClassName);
     700         }
     701     }
     702 
     703     private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
     704         if (olddata == null || nowdata == null)
     705             return true;
     706         TopicRouteData old = olddata.cloneTopicRouteData();
     707         TopicRouteData now = nowdata.cloneTopicRouteData();
     708         Collections.sort(old.getQueueDatas());
     709         Collections.sort(old.getBrokerDatas());
     710         Collections.sort(now.getQueueDatas());
     711         Collections.sort(now.getBrokerDatas());
     712         return !old.equals(now);
     713 
     714     }
     715 
     716     private boolean isNeedUpdateTopicRouteInfo(final String topic) {
     717         boolean result = false;
     718         {
     719             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
     720             while (it.hasNext() && !result) {
     721                 Entry<String, MQProducerInner> entry = it.next();
     722                 MQProducerInner impl = entry.getValue();
     723                 if (impl != null) {
     724                     result = impl.isPublishTopicNeedUpdate(topic);
     725                 }
     726             }
     727         }
     728 
     729         {
     730             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     731             while (it.hasNext() && !result) {
     732                 Entry<String, MQConsumerInner> entry = it.next();
     733                 MQConsumerInner impl = entry.getValue();
     734                 if (impl != null) {
     735                     result = impl.isSubscribeTopicNeedUpdate(topic);
     736                 }
     737             }
     738         }
     739 
     740         return result;
     741     }
     742 
     743     public void shutdown() {
     744         // Consumer
     745         if (!this.consumerTable.isEmpty())
     746             return;
     747 
     748         // AdminExt
     749         if (!this.adminExtTable.isEmpty())
     750             return;
     751 
     752         // Producer
     753         if (this.producerTable.size() > 1)
     754             return;
     755 
     756         synchronized (this) {
     757             switch (this.serviceState) {
     758                 case CREATE_JUST:
     759                     break;
     760                 case RUNNING:
     761                     this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
     762 
     763                     this.serviceState = ServiceState.SHUTDOWN_ALREADY;
     764                     this.pullMessageService.shutdown(true);
     765                     this.scheduledExecutorService.shutdown();
     766                     this.mQClientAPIImpl.shutdown();
     767                     this.rebalanceService.shutdown();
     768 
     769                     MQClientManager.getInstance().removeClientFactory(this.clientId);
     770                     log.info("the client factory [{}] shutdown OK", this.clientId);
     771                     break;
     772                 case SHUTDOWN_ALREADY:
     773                     break;
     774                 default:
     775                     break;
     776             }
     777         }
     778     }
     779 
     780     public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
     781         if (null == group || null == consumer) {
     782             return false;
     783         }
     784 
     785         MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
     786         if (prev != null) {
     787             log.warn("the consumer group[" + group + "] exist already.");
     788             return false;
     789         }
     790 
     791         return true;
     792     }
     793 
     794     public void unregisterConsumer(final String group) {
     795         this.consumerTable.remove(group);
     796         this.unregisterClientWithLock(null, group);
     797     }
     798 
     799     private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
     800         try {
     801             if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
     802                 try {
     803                     this.unregisterClient(producerGroup, consumerGroup);
     804                 } catch (Exception e) {
     805                     log.error("unregisterClient exception", e);
     806                 } finally {
     807                     this.lockHeartbeat.unlock();
     808                 }
     809             } else {
     810                 log.warn("lock heartBeat, but failed. [{}]", this.clientId);
     811             }
     812         } catch (InterruptedException e) {
     813             log.warn("unregisterClientWithLock exception", e);
     814         }
     815     }
     816 
     817     private void unregisterClient(final String producerGroup, final String consumerGroup) {
     818         Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
     819         while (it.hasNext()) {
     820             Entry<String, HashMap<Long, String>> entry = it.next();
     821             String brokerName = entry.getKey();
     822             HashMap<Long, String> oneTable = entry.getValue();
     823 
     824             if (oneTable != null) {
     825                 for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
     826                     String addr = entry1.getValue();
     827                     if (addr != null) {
     828                         try {
     829                             this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
     830                             log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
     831                         } catch (RemotingException e) {
     832                             log.error("unregister client exception from broker: " + addr, e);
     833                         } catch (InterruptedException e) {
     834                             log.error("unregister client exception from broker: " + addr, e);
     835                         } catch (MQBrokerException e) {
     836                             log.error("unregister client exception from broker: " + addr, e);
     837                         }
     838                     }
     839                 }
     840             }
     841         }
     842     }
     843 
     844     public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
     845         if (null == group || null == producer) {
     846             return false;
     847         }
     848 
     849         MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
     850         if (prev != null) {
     851             log.warn("the producer group[{}] exist already.", group);
     852             return false;
     853         }
     854 
     855         return true;
     856     }
     857 
     858     public void unregisterProducer(final String group) {
     859         this.producerTable.remove(group);
     860         this.unregisterClientWithLock(group, null);
     861     }
     862 
     863     public boolean registerAdminExt(final String group, final MQAdminExtInner admin) {
     864         if (null == group || null == admin) {
     865             return false;
     866         }
     867 
     868         MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
     869         if (prev != null) {
     870             log.warn("the admin group[{}] exist already.", group);
     871             return false;
     872         }
     873 
     874         return true;
     875     }
     876 
     877     public void unregisterAdminExt(final String group) {
     878         this.adminExtTable.remove(group);
     879     }
     880 
     881     public void rebalanceImmediately() {
     882         this.rebalanceService.wakeup();
     883     }
     884 
     885     public void doRebalance() {
     886         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
     887             MQConsumerInner impl = entry.getValue();
     888             if (impl != null) {
     889                 try {
     890                     impl.doRebalance();
     891                 } catch (Throwable e) {
     892                     log.error("doRebalance exception", e);
     893                 }
     894             }
     895         }
     896     }
     897 
     898     public MQProducerInner selectProducer(final String group) {
     899         return this.producerTable.get(group);
     900     }
     901 
     902     public MQConsumerInner selectConsumer(final String group) {
     903         return this.consumerTable.get(group);
     904     }
     905 
     906     public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
     907         String brokerAddr = null;
     908         boolean slave = false;
     909         boolean found = false;
     910 
     911         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
     912         if (map != null && !map.isEmpty()) {
     913             for (Map.Entry<Long, String> entry : map.entrySet()) {
     914                 Long id = entry.getKey();
     915                 brokerAddr = entry.getValue();
     916                 if (brokerAddr != null) {
     917                     found = true;
     918                     if (MixAll.MASTER_ID == id) {
     919                         slave = false;
     920                     } else {
     921                         slave = true;
     922                     }
     923                     break;
     924 
     925                 }
     926             } // end of for
     927         }
     928 
     929         if (found) {
     930             return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
     931         }
     932 
     933         return null;
     934     }
     935 
     936     public String findBrokerAddressInPublish(final String brokerName) {
     937         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
     938         if (map != null && !map.isEmpty()) {
     939             return map.get(MixAll.MASTER_ID);
     940         }
     941 
     942         return null;
     943     }
     944 
     945     public FindBrokerResult findBrokerAddressInSubscribe(
     946         final String brokerName,
     947         final long brokerId,
     948         final boolean onlyThisBroker
     949     ) {
     950         String brokerAddr = null;
     951         boolean slave = false;
     952         boolean found = false;
     953 
     954         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
     955         if (map != null && !map.isEmpty()) {
     956             brokerAddr = map.get(brokerId);
     957             slave = brokerId != MixAll.MASTER_ID;
     958             found = brokerAddr != null;
     959 
     960             if (!found && slave) {
     961                 brokerAddr = map.get(brokerId + 1);
     962                 found = brokerAddr != null;
     963             }
     964 
     965             if (!found && !onlyThisBroker) {
     966                 Entry<Long, String> entry = map.entrySet().iterator().next();
     967                 brokerAddr = entry.getValue();
     968                 slave = entry.getKey() != MixAll.MASTER_ID;
     969                 found = true;
     970             }
     971         }
     972 
     973         if (found) {
     974             return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
     975         }
     976 
     977         return null;
     978     }
     979 
     980     public int findBrokerVersion(String brokerName, String brokerAddr) {
     981         if (this.brokerVersionTable.containsKey(brokerName)) {
     982             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
     983                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
     984             }
     985         }
     986         //To do need to fresh the version
     987         return 0;
     988     }
     989 
     990     public List<String> findConsumerIdList(final String topic, final String group) {
     991         String brokerAddr = this.findBrokerAddrByTopic(topic);
     992         if (null == brokerAddr) {
     993             this.updateTopicRouteInfoFromNameServer(topic);
     994             brokerAddr = this.findBrokerAddrByTopic(topic);
     995         }
     996 
     997         if (null != brokerAddr) {
     998             try {
     999                 return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
    1000             } catch (Exception e) {
    1001                 log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
    1002             }
    1003         }
    1004 
    1005         return null;
    1006     }
    1007 
    1008     public String findBrokerAddrByTopic(final String topic) {
    1009         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
    1010         if (topicRouteData != null) {
    1011             List<BrokerData> brokers = topicRouteData.getBrokerDatas();
    1012             if (!brokers.isEmpty()) {
    1013                 int index = random.nextInt(brokers.size());
    1014                 BrokerData bd = brokers.get(index % brokers.size());
    1015                 return bd.selectBrokerAddr();
    1016             }
    1017         }
    1018 
    1019         return null;
    1020     }
    1021 
    1022     public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    1023         DefaultMQPushConsumerImpl consumer = null;
    1024         try {
    1025             MQConsumerInner impl = this.consumerTable.get(group);
    1026             if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
    1027                 consumer = (DefaultMQPushConsumerImpl) impl;
    1028             } else {
    1029                 log.info("[reset-offset] consumer dose not exist. group={}", group);
    1030                 return;
    1031             }
    1032             consumer.suspend();
    1033 
    1034             ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
    1035             for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
    1036                 MessageQueue mq = entry.getKey();
    1037                 if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
    1038                     ProcessQueue pq = entry.getValue();
    1039                     pq.setDropped(true);
    1040                     pq.clear();
    1041                 }
    1042             }
    1043 
    1044             try {
    1045                 TimeUnit.SECONDS.sleep(10);
    1046             } catch (InterruptedException e) {
    1047             }
    1048 
    1049             Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
    1050             while (iterator.hasNext()) {
    1051                 MessageQueue mq = iterator.next();
    1052                 Long offset = offsetTable.get(mq);
    1053                 if (topic.equals(mq.getTopic()) && offset != null) {
    1054                     try {
    1055                         consumer.updateConsumeOffset(mq, offset);
    1056                         consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
    1057                         iterator.remove();
    1058                     } catch (Exception e) {
    1059                         log.warn("reset offset failed. group={}, {}", group, mq, e);
    1060                     }
    1061                 }
    1062             }
    1063         } finally {
    1064             if (consumer != null) {
    1065                 consumer.resume();
    1066             }
    1067         }
    1068     }
    1069 
    1070     public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
    1071         MQConsumerInner impl = this.consumerTable.get(group);
    1072         if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
    1073             DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
    1074             return consumer.getOffsetStore().cloneOffsetTable(topic);
    1075         } else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
    1076             DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
    1077             return consumer.getOffsetStore().cloneOffsetTable(topic);
    1078         } else {
    1079             return Collections.EMPTY_MAP;
    1080         }
    1081     }
    1082 
    1083     public TopicRouteData getAnExistTopicRouteData(final String topic) {
    1084         return this.topicRouteTable.get(topic);
    1085     }
    1086 
    1087     public MQClientAPIImpl getMQClientAPIImpl() {
    1088         return mQClientAPIImpl;
    1089     }
    1090 
    1091     public MQAdminImpl getMQAdminImpl() {
    1092         return mQAdminImpl;
    1093     }
    1094 
    1095     public long 
    View Code

      producerTable:当前client实例的全部生产者的内部实例。

      consumerTable:当前client实例的全部消费者的内部实例。

      adminExtTable:当前client实例的全部管理实例。

      mQClientAPIImpl:其实每个client也是一个Netty Server,也会支持Broker访问,这里实现了全部client支持的接口。

      mQAdminImpl:管理接口的本地实现类。

      topicRouteTable:当前生产者、消费者中全部Topic的本地缓存路由信息。

      ScheduledExecutorService:本地定时任务,比如定期获取当前Namesrv地址、定期同步Namesrv信息、定期更新Topic路由信息、定期发送心跳信息给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数。

      clientRemotingProcessor:请求的处理器,从处理方法processRequest()中我们可以知道目前支持哪些功能接口。

      pullMessageService:Pull服务。

      这里为什么会启动用于消费的Pull服务呢?这是一个兼容的写法。Pull服务其实是由一个状态变量方法 this.isStopped()控制的,这个stopped状态变量默认是False,而pullRequestQueue也是空的,所以这里只是启动了pullMessageService,并没有真正地执行Pull操作,代码路径 D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplconsumerPullMessageService.java,相关如下:

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package org.apache.rocketmq.client.impl.consumer;
     18 
     19 import java.util.concurrent.Executors;
     20 import java.util.concurrent.LinkedBlockingQueue;
     21 import java.util.concurrent.ScheduledExecutorService;
     22 import java.util.concurrent.ThreadFactory;
     23 import java.util.concurrent.TimeUnit;
     24 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
     25 import org.apache.rocketmq.client.log.ClientLogger;
     26 import org.apache.rocketmq.common.ServiceThread;
     27 import org.apache.rocketmq.logging.InternalLogger;
     28 import org.apache.rocketmq.common.utils.ThreadUtils;
     29 
     30 public class PullMessageService extends ServiceThread {
     31     private final InternalLogger log = ClientLogger.getLog();
     32     private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
     33     private final MQClientInstance mQClientFactory;
     34     private final ScheduledExecutorService scheduledExecutorService = Executors
     35         .newSingleThreadScheduledExecutor(new ThreadFactory() {
     36             @Override
     37             public Thread newThread(Runnable r) {
     38                 return new Thread(r, "PullMessageServiceScheduledThread");
     39             }
     40         });
     41 
     42     public PullMessageService(MQClientInstance mQClientFactory) {
     43         this.mQClientFactory = mQClientFactory;
     44     }
     45 
     46     public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
     47         if (!isStopped()) {
     48             this.scheduledExecutorService.schedule(new Runnable() {
     49                 @Override
     50                 public void run() {
     51                     PullMessageService.this.executePullRequestImmediately(pullRequest);
     52                 }
     53             }, timeDelay, TimeUnit.MILLISECONDS);
     54         } else {
     55             log.warn("PullMessageServiceScheduledThread has shutdown");
     56         }
     57     }
     58 
     59     public void executePullRequestImmediately(final PullRequest pullRequest) {
     60         try {
     61             this.pullRequestQueue.put(pullRequest);
     62         } catch (InterruptedException e) {
     63             log.error("executePullRequestImmediately pullRequestQueue.put", e);
     64         }
     65     }
     66 
     67     public void executeTaskLater(final Runnable r, final long timeDelay) {
     68         if (!isStopped()) {
     69             this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
     70         } else {
     71             log.warn("PullMessageServiceScheduledThread has shutdown");
     72         }
     73     }
     74 
     75     public ScheduledExecutorService getScheduledExecutorService() {
     76         return scheduledExecutorService;
     77     }
     78 
     79     private void pullMessage(final PullRequest pullRequest) {
     80         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
     81         if (consumer != null) {
     82             DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
     83             impl.pullMessage(pullRequest);
     84         } else {
     85             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
     86         }
     87     }
     88 
     89     @Override
     90     public void run() {
     91         log.info(this.getServiceName() + " service started");
     92 
     93         while (!this.isStopped()) {
     94             try {
     95                 PullRequest pullRequest = this.pullRequestQueue.take();
     96                 this.pullMessage(pullRequest);
     97             } catch (InterruptedException ignored) {
     98             } catch (Exception e) {
     99                 log.error("Pull Message Service Run Method exception", e);
    100             }
    101         }
    102 
    103         log.info(this.getServiceName() + " service end");
    104     }
    105 
    106     @Override
    107     public void shutdown(boolean interrupt) {
    108         super.shutdown(interrupt);
    109         ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
    110     }
    111 
    112     @Override
    113     public String getServiceName() {
    114         return PullMessageService.class.getSimpleName();
    115     }
    116 
    117 }
    View Code

      rebalanceService:重新平衡服务。定期执行重新平衡方法 this.mqClientFactory.doRebalance()。代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplconsumerRebalanceService.java,代码如下:

     1 /*
     2  * Licensed to the Apache Software Foundation (ASF) under one or more
     3  * contributor license agreements.  See the NOTICE file distributed with
     4  * this work for additional information regarding copyright ownership.
     5  * The ASF licenses this file to You under the Apache License, Version 2.0
     6  * (the "License"); you may not use this file except in compliance with
     7  * the License.  You may obtain a copy of the License at
     8  *
     9  *     http://www.apache.org/licenses/LICENSE-2.0
    10  *
    11  * Unless required by applicable law or agreed to in writing, software
    12  * distributed under the License is distributed on an "AS IS" BASIS,
    13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  * See the License for the specific language governing permissions and
    15  * limitations under the License.
    16  */
    17 package org.apache.rocketmq.client.impl.consumer;
    18 
    19 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
    20 import org.apache.rocketmq.client.log.ClientLogger;
    21 import org.apache.rocketmq.common.ServiceThread;
    22 import org.apache.rocketmq.logging.InternalLogger;
    23 
    24 public class RebalanceService extends ServiceThread {
    25     private static long waitInterval =
    26         Long.parseLong(System.getProperty(
    27             "rocketmq.client.rebalance.waitInterval", "20000"));
    28     private final InternalLogger log = ClientLogger.getLog();
    29     private final MQClientInstance mqClientFactory;
    30 
    31     public RebalanceService(MQClientInstance mqClientFactory) {
    32         this.mqClientFactory = mqClientFactory;
    33     }
    34 
    35     @Override
    36     public void run() {
    37         log.info(this.getServiceName() + " service started");
    38 
    39         while (!this.isStopped()) {
    40             this.waitForRunning(waitInterval);
    41             this.mqClientFactory.doRebalance();
    42         }
    43 
    44         log.info(this.getServiceName() + " service end");
    45     }
    46 
    47     @Override
    48     public String getServiceName() {
    49         return RebalanceService.class.getSimpleName();
    50     }
    51 }
    View Code

      这里的 mqClientFactory 就是 MQClientInstance 实例,通过依次调用 MQClientInstance 中保存的消费者实例的 doRebalance() 方法,来感知订阅关系的变化、集群变化等,以达到重新平衡。

      consumerStartsManager:消费监控。比如拉取RT(Response Time,响应时间)、拉取TPS(Transactions Per Second,每秒处理消息数)、消费RT等都可以统计。

    MQClientInstance 中还有一些核心方法,代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplfactoryMQClientInstance.java:

      updateTopicRouteInfoFromNameServer:从多个Namesrv中获取最新Topic路由信息,更新本地缓存。

     1     public void updateTopicRouteInfoFromNameServer() {
     2         Set<String> topicList = new HashSet<String>();
     3 
     4         // Consumer
     5         {
     6             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     7             while (it.hasNext()) {
     8                 Entry<String, MQConsumerInner> entry = it.next();
     9                 MQConsumerInner impl = entry.getValue();
    10                 if (impl != null) {
    11                     Set<SubscriptionData> subList = impl.subscriptions();
    12                     if (subList != null) {
    13                         for (SubscriptionData subData : subList) {
    14                             topicList.add(subData.getTopic());
    15                         }
    16                     }
    17                 }
    18             }
    19         }
    20 
    21         // Producer
    22         {
    23             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    24             while (it.hasNext()) {
    25                 Entry<String, MQProducerInner> entry = it.next();
    26                 MQProducerInner impl = entry.getValue();
    27                 if (impl != null) {
    28                     Set<String> lst = impl.getPublishTopicList();
    29                     topicList.addAll(lst);
    30                 }
    31             }
    32         }
    33 
    34         for (String topic : topicList) {
    35             this.updateTopicRouteInfoFromNameServer(topic);
    36         }
    37     }
    updateTopicRouteInfoFromNameServer()

       cleanOfflineBroker:清理已经下线的Broker。

     1    /**
     2      * Remove offline broker
     3      */
     4     private void cleanOfflineBroker() {
     5         try {
     6             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
     7                 try {
     8                     ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
     9 
    10                     Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
    11                     while (itBrokerTable.hasNext()) {
    12                         Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
    13                         String brokerName = entry.getKey();
    14                         HashMap<Long, String> oneTable = entry.getValue();
    15 
    16                         HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
    17                         cloneAddrTable.putAll(oneTable);
    18 
    19                         Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
    20                         while (it.hasNext()) {
    21                             Entry<Long, String> ee = it.next();
    22                             String addr = ee.getValue();
    23                             if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
    24                                 it.remove();
    25                                 log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
    26                             }
    27                         }
    28 
    29                         if (cloneAddrTable.isEmpty()) {
    30                             itBrokerTable.remove();
    31                             log.info("the broker[{}] name's host is offline, remove it", brokerName);
    32                         } else {
    33                             updatedTable.put(brokerName, cloneAddrTable);
    34                         }
    35                     }
    36 
    37                     if (!updatedTable.isEmpty()) {
    38                         this.brokerAddrTable.putAll(updatedTable);
    39                     }
    40                 } finally {
    41                     this.lockNamesrv.unlock();
    42                 }
    43         } catch (InterruptedException e) {
    44             log.warn("cleanOfflineBroker Exception", e);
    45         }
    46     }
    cleanOfflineBroker()

      checkClientInBroker:检查Client是否在Broker中有效。

     1     public void checkClientInBroker() throws MQClientException {
     2         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     3 
     4         while (it.hasNext()) {
     5             Entry<String, MQConsumerInner> entry = it.next();
     6             Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
     7             if (subscriptionInner == null || subscriptionInner.isEmpty()) {
     8                 return;
     9             }
    10 
    11             for (SubscriptionData subscriptionData : subscriptionInner) {
    12                 if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    13                     continue;
    14                 }
    15                 // may need to check one broker every cluster...
    16                 // assume that the configs of every broker in cluster are the the same.
    17                 String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
    18 
    19                 if (addr != null) {
    20                     try {
    21                         this.getMQClientAPIImpl().checkClientInBroker(
    22                             addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
    23                         );
    24                     } catch (Exception e) {
    25                         if (e instanceof MQClientException) {
    26                             throw (MQClientException) e;
    27                         } else {
    28                             throw new MQClientException("Check client in broker error, maybe because you use "
    29                                 + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
    30                                 + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
    31                                 "have use the new features which are not supported by server, please check the log!", e);
    32                         }
    33                     }
    34                 }
    35             }
    36         }
    37     }
    checkClientInBroker()

      sendHeartbeatToAllBrokerWithLock:发送客户端的心跳信息给所有的 Broker。

     1     public void sendHeartbeatToAllBrokerWithLock() {
     2         if (this.lockHeartbeat.tryLock()) {
     3             try {
     4                 this.sendHeartbeatToAllBroker();
     5                 this.uploadFilterClassSource();
     6             } catch (final Exception e) {
     7                 log.error("sendHeartbeatToAllBroker exception", e);
     8             } finally {
     9                 this.lockHeartbeat.unlock();
    10             }
    11         } else {
    12             log.warn("lock heartBeat, but failed. [{}]", this.clientId);
    13         }
    14     }
    sendHeartbeatToAllBrokerWithLock()

      registerConsumer:在本地注册一个消费者。

     1     public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
     2         ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
     3         final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
     4 
     5         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
     6         if (null == consumerGroupInfo) {
     7             ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
     8             ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
     9             consumerGroupInfo = prev != null ? prev : tmp;
    10         }
    11 
    12         boolean r1 =
    13             consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
    14                 consumeFromWhere);
    15         boolean r2 = consumerGroupInfo.updateSubscription(subList);
    16 
    17         if (r1 || r2) {
    18             if (isNotifyConsumerIdsChangedEnable) {
    19                 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    20             }
    21         }
    22 
    23         this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    24 
    25         return r1 || r2;
    26     }
    registerConsumer()

      unregisterConsumer:取消本地注册的消费者。

     1     public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
     2         boolean isNotifyConsumerIdsChangedEnable) {
     3         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
     4         if (null != consumerGroupInfo) {
     5             consumerGroupInfo.unregisterChannel(clientChannelInfo);
     6             if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
     7                 ConsumerGroupInfo remove = this.consumerTable.remove(group);
     8                 if (remove != null) {
     9                     log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
    10 
    11                     this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
    12                 }
    13             }
    14             if (isNotifyConsumerIdsChangedEnable) {
    15                 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    16             }
    17         }
    18     }
    unregisterConsumer()

      registerProducer:在本地注册一个生产者。

     1     public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
     2         ClientChannelInfo clientChannelInfoFound = null;
     3 
     4         ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
     5         if (null == channelTable) {
     6             channelTable = new ConcurrentHashMap<>();
     7             this.groupChannelTable.put(group, channelTable);
     8         }
     9 
    10         clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
    11         if (null == clientChannelInfoFound) {
    12             channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
    13             clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
    14             log.info("new producer connected, group: {} channel: {}", group,
    15                     clientChannelInfo.toString());
    16         }
    17 
    18 
    19         if (clientChannelInfoFound != null) {
    20             clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
    21         }
    22     }
    registerProducer()

      unregisterProducer:取消本地注册的生产者。

     1     public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
     2         ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
     3         if (null != channelTable && !channelTable.isEmpty()) {
     4             ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
     5             clientChannelTable.remove(clientChannelInfo.getClientId());
     6             if (old != null) {
     7                 log.info("unregister a producer[{}] from groupChannelTable {}", group,
     8                         clientChannelInfo.toString());
     9             }
    10 
    11             if (channelTable.isEmpty()) {
    12                 this.groupChannelTable.remove(group);
    13                 log.info("unregister a producer group[{}] from groupChannelTable", group);
    14             }
    15         }
    16     }
    unregisterProducer()

      rebalanceImmediately:立即执行一次 Rebalance。该操作是通过 RocketMQ的一个 CountDownLatch2 锁来实现的。

    1     public void rebalanceImmediately() {
    2         this.rebalanceService.wakeup();
    3     }
    rebalanceImmediately()
    1     public void wakeup() {
    2         if (hasNotified.compareAndSet(false, true)) {
    3             waitPoint.countDown(); // notify
    4         }
    5     }
    wakeup()
    1     public void countDown() {
    2         sync.releaseShared(1);
    3     }
    countDown()

      doRebalance:对于所有已经注册的消费者实例,执行一次Rebalance。

     1     public void doRebalance(final boolean isOrder) {
     2         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
     3         if (subTable != null) {
     4             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
     5                 final String topic = entry.getKey();
     6                 try {
     7                     this.rebalanceByTopic(topic, isOrder);
     8                 } catch (Throwable e) {
     9                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    10                         log.warn("rebalanceByTopic Exception", e);
    11                     }
    12                 }
    13             }
    14         }
    15 
    16         this.truncateMessageQueueNotMyTopic();
    17     }
    doRebalance()

      findBrokerAddressInAdmin:在本地缓存中查找 Master 或者 Slave Broker 信息。

     1     public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
     2         String brokerAddr = null;
     3         boolean slave = false;
     4         boolean found = false;
     5 
     6         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
     7         if (map != null && !map.isEmpty()) {
     8             for (Map.Entry<Long, String> entry : map.entrySet()) {
     9                 Long id = entry.getKey();
    10                 brokerAddr = entry.getValue();
    11                 if (brokerAddr != null) {
    12                     found = true;
    13                     if (MixAll.MASTER_ID == id) {
    14                         slave = false;
    15                     } else {
    16                         slave = true;
    17                     }
    18                     break;
    19 
    20                 }
    21             } // end of for
    22         }
    23 
    24         if (found) {
    25             return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    26         }
    27 
    28         return null;
    29     }
    findBrokerAddressInAdmin()

      findBrokerAddressInSubscribe:在本地缓存中查找Slave Broker信息。

     1     public FindBrokerResult findBrokerAddressInSubscribe(
     2         final String brokerName,
     3         final long brokerId,
     4         final boolean onlyThisBroker
     5     ) {
     6         String brokerAddr = null;
     7         boolean slave = false;
     8         boolean found = false;
     9 
    10         HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    11         if (map != null && !map.isEmpty()) {
    12             brokerAddr = map.get(brokerId);
    13             slave = brokerId != MixAll.MASTER_ID;
    14             found = brokerAddr != null;
    15 
    16             if (!found && slave) {
    17                 brokerAddr = map.get(brokerId + 1);
    18                 found = brokerAddr != null;
    19             }
    20 
    21             if (!found && !onlyThisBroker) {
    22                 Entry<Long, String> entry = map.entrySet().iterator().next();
    23                 brokerAddr = entry.getValue();
    24                 slave = entry.getKey() != MixAll.MASTER_ID;
    25                 found = true;
    26             }
    27         }
    28 
    29         if (found) {
    30             return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
    31         }
    32 
    33         return null;
    34     }
    findBrokerAddressInSubscribe()

      findBrokerAddressInPublish:在本地缓存中查找 Master Broker 地址。

    1 public String findBrokerAddressInPublish(final String brokerName) {
    2     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    3     if (map != null && !map.isEmpty()) {
    4         return map.get(MixAll.MASTER_ID);
    5     }
    6 
    7     return null;
    8 }
    findBrokerAddressInPublish()

      findConsumerIdList:查找消费者id列表。向Broker端发送获取该消费组下消费者Id列表的RPC通信请求(Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP)

     1 public List<String> findConsumerIdList(final String topic, final String group) {
     2         String brokerAddr = this.findBrokerAddrByTopic(topic);
     3         if (null == brokerAddr) {
     4             this.updateTopicRouteInfoFromNameServer(topic);
     5             brokerAddr = this.findBrokerAddrByTopic(topic);
     6         }
     7 
     8         if (null != brokerAddr) {
     9             try {
    10                 return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
    11             } catch (Exception e) {
    12                 log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
    13             }
    14         }
    15 
    16         return null;
    17     }
    findConsumerIdList()

      findBrokerAddrByTopic:通过 Topic 名字查找 Broker 地址。

     1     public String findBrokerAddrByTopic(final String topic) {
     2         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
     3         if (topicRouteData != null) {
     4             List<BrokerData> brokers = topicRouteData.getBrokerDatas();
     5             if (!brokers.isEmpty()) {
     6                 int index = random.nextInt(brokers.size());
     7                 BrokerData bd = brokers.get(index % brokers.size());
     8                 return bd.selectBrokerAddr();
     9             }
    10         }
    11 
    12         return null;
    13     }
    findBrokerAddrByTopic()

      resetOffset:重置消费位点。

     1     public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
     2         DefaultMQPushConsumerImpl consumer = null;
     3         try {
     4             MQConsumerInner impl = this.consumerTable.get(group);
     5             if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
     6                 consumer = (DefaultMQPushConsumerImpl) impl;
     7             } else {
     8                 log.info("[reset-offset] consumer dose not exist. group={}", group);
     9                 return;
    10             }
    11             consumer.suspend();
    12 
    13             ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
    14             for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
    15                 MessageQueue mq = entry.getKey();
    16                 if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
    17                     ProcessQueue pq = entry.getValue();
    18                     pq.setDropped(true);
    19                     pq.clear();
    20                 }
    21             }
    22 
    23             try {
    24                 TimeUnit.SECONDS.sleep(10);
    25             } catch (InterruptedException e) {
    26             }
    27 
    28             Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
    29             while (iterator.hasNext()) {
    30                 MessageQueue mq = iterator.next();
    31                 Long offset = offsetTable.get(mq);
    32                 if (topic.equals(mq.getTopic()) && offset != null) {
    33                     try {
    34                         consumer.updateConsumeOffset(mq, offset);
    35                         consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
    36                         iterator.remove();
    37                     } catch (Exception e) {
    38                         log.warn("reset offset failed. group={}, {}", group, mq, e);
    39                     }
    40                 }
    41             }
    42         } finally {
    43             if (consumer != null) {
    44                 consumer.resume();
    45             }
    46         }
    47     }
    resetOffset()

      getConsumerStatus:获取一个订阅关系中每个队列的消费进度。

     1     public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
     2         MQConsumerInner impl = this.consumerTable.get(group);
     3         if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
     4             DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
     5             return consumer.getOffsetStore().cloneOffsetTable(topic);
     6         } else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
     7             DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
     8             return consumer.getOffsetStore().cloneOffsetTable(topic);
     9         } else {
    10             return Collections.EMPTY_MAP;
    11         }
    12     }
    getConsumerStatus()

      getTopicRouteTable:获取本地缓存 Topic 路由。

    1     public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {
    2         return topicRouteTable;
    3     }
    getTopicRouteTable()

      consumeMessageDirectly:直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的消费者组中的一个,而不是全部已经订阅的消费者。一般适用于在消费消息后,某一个消费者组想再消费一次的场景。

    1     @Override
    2     public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
    3         throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
    4         return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
    5     }
    consumeMessageDirectly()
    1     @Override
    2     public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
    3         throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
    4         MessageExt msg = this.viewMessage(msgId);
    5 
    6         return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
    7             consumerGroup, clientId, msgId, timeoutMillis * 3);
    8     }
    consumeMessageDirectly()

      consumerRunningInfo:获取消费者的消费统计信息。包含消费RT、消费TPS等。

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package org.apache.rocketmq.common.protocol.body;
     19 
     20 import java.util.Iterator;
     21 import java.util.Map.Entry;
     22 import java.util.Properties;
     23 import java.util.TreeMap;
     24 import java.util.TreeSet;
     25 import org.apache.rocketmq.common.message.MessageQueue;
     26 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
     27 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
     28 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
     29 
     30 public class ConsumerRunningInfo extends RemotingSerializable {
     31     public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
     32     public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE";
     33     public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
     34     public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
     35     public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
     36     public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
     37 
     38     private Properties properties = new Properties();
     39 
     40     private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
     41 
     42     private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
     43 
     44     private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
     45 
     46     private String jstack;
     47 
     48     public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
     49         ConsumerRunningInfo prev = criTable.firstEntry().getValue();
     50 
     51         boolean push = false;
     52         {
     53             String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
     54 
     55             if (property == null) {
     56                 property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
     57             }
     58             push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
     59         }
     60 
     61         boolean startForAWhile = false;
     62         {
     63 
     64             String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP);
     65             if (property == null) {
     66                 property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP));
     67             }
     68             startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2);
     69         }
     70 
     71         if (push && startForAWhile) {
     72 
     73             {
     74                 Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
     75                 while (it.hasNext()) {
     76                     Entry<String, ConsumerRunningInfo> next = it.next();
     77                     ConsumerRunningInfo current = next.getValue();
     78                     boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet());
     79 
     80                     if (!equals) {
     81                         // Different subscription in the same group of consumer
     82                         return false;
     83                     }
     84 
     85                     prev = next.getValue();
     86                 }
     87 
     88                 if (prev != null) {
     89 
     90                     if (prev.getSubscriptionSet().isEmpty()) {
     91                         // Subscription empty!
     92                         return false;
     93                     }
     94                 }
     95             }
     96         }
     97 
     98         return true;
     99     }
    100 
    101     public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
    102         return true;
    103     }
    104 
    105     public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) {
    106         StringBuilder sb = new StringBuilder();
    107         boolean push = false;
    108         {
    109             String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
    110 
    111             if (property == null) {
    112                 property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
    113             }
    114             push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
    115         }
    116 
    117         boolean orderMsg = false;
    118         {
    119             String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY);
    120             orderMsg = Boolean.parseBoolean(property);
    121         }
    122 
    123         if (push) {
    124             Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator();
    125             while (it.hasNext()) {
    126                 Entry<MessageQueue, ProcessQueueInfo> next = it.next();
    127                 MessageQueue mq = next.getKey();
    128                 ProcessQueueInfo pq = next.getValue();
    129 
    130                 if (orderMsg) {
    131 
    132                     if (!pq.isLocked()) {
    133                         sb.append(String.format("%s %s can't lock for a while, %dms%n",
    134                             clientId,
    135                             mq,
    136                             System.currentTimeMillis() - pq.getLastLockTimestamp()));
    137                     } else {
    138                         if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
    139                             sb.append(String.format("%s %s unlock %d times, still failed%n",
    140                                 clientId,
    141                                 mq,
    142                                 pq.getTryUnlockTimes()));
    143                         }
    144                     }
    145 
    146                 } else {
    147                     long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
    148 
    149                     if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
    150                         sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n",
    151                             clientId,
    152                             mq,
    153                             diff));
    154                     }
    155                 }
    156             }
    157         }
    158 
    159         return sb.toString();
    160     }
    161 
    162     public Properties getProperties() {
    163         return properties;
    164     }
    165 
    166     public void setProperties(Properties properties) {
    167         this.properties = properties;
    168     }
    169 
    170     public TreeSet<SubscriptionData> getSubscriptionSet() {
    171         return subscriptionSet;
    172     }
    173 
    174     public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) {
    175         this.subscriptionSet = subscriptionSet;
    176     }
    177 
    178     public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() {
    179         return mqTable;
    180     }
    181 
    182     public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) {
    183         this.mqTable = mqTable;
    184     }
    185 
    186     public TreeMap<String, ConsumeStatus> getStatusTable() {
    187         return statusTable;
    188     }
    189 
    190     public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) {
    191         this.statusTable = statusTable;
    192     }
    193 
    194     public String formatString() {
    195         StringBuilder sb = new StringBuilder();
    196 
    197         {
    198             sb.append("#Consumer Properties#
    ");
    199             Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator();
    200             while (it.hasNext()) {
    201                 Entry<Object, Object> next = it.next();
    202                 String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString());
    203                 sb.append(item);
    204             }
    205         }
    206 
    207         {
    208             sb.append("
    
    #Consumer Subscription#
    ");
    209 
    210             Iterator<SubscriptionData> it = this.subscriptionSet.iterator();
    211             int i = 0;
    212             while (it.hasNext()) {
    213                 SubscriptionData next = it.next();
    214                 String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n",
    215                     ++i,
    216                     next.getTopic(),
    217                     next.isClassFilterMode(),
    218                     next.getSubString());
    219 
    220                 sb.append(item);
    221             }
    222         }
    223 
    224         {
    225             sb.append("
    
    #Consumer Offset#
    ");
    226             sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n",
    227                 "#Topic",
    228                 "#Broker Name",
    229                 "#QID",
    230                 "#Consumer Offset"
    231             ));
    232 
    233             Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
    234             while (it.hasNext()) {
    235                 Entry<MessageQueue, ProcessQueueInfo> next = it.next();
    236                 String item = String.format("%-32s  %-32s  %-4d  %-20d%n",
    237                     next.getKey().getTopic(),
    238                     next.getKey().getBrokerName(),
    239                     next.getKey().getQueueId(),
    240                     next.getValue().getCommitOffset());
    241 
    242                 sb.append(item);
    243             }
    244         }
    245 
    246         {
    247             sb.append("
    
    #Consumer MQ Detail#
    ");
    248             sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n",
    249                 "#Topic",
    250                 "#Broker Name",
    251                 "#QID",
    252                 "#ProcessQueueInfo"
    253             ));
    254 
    255             Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
    256             while (it.hasNext()) {
    257                 Entry<MessageQueue, ProcessQueueInfo> next = it.next();
    258                 String item = String.format("%-32s  %-32s  %-4d  %s%n",
    259                     next.getKey().getTopic(),
    260                     next.getKey().getBrokerName(),
    261                     next.getKey().getQueueId(),
    262                     next.getValue().toString());
    263 
    264                 sb.append(item);
    265             }
    266         }
    267 
    268         {
    269             sb.append("
    
    #Consumer RT&TPS#
    ");
    270             sb.append(String.format("%-32s  %14s %14s %14s %14s %18s %25s%n",
    271                 "#Topic",
    272                 "#Pull RT",
    273                 "#Pull TPS",
    274                 "#Consume RT",
    275                 "#ConsumeOK TPS",
    276                 "#ConsumeFailed TPS",
    277                 "#ConsumeFailedMsgsInHour"
    278             ));
    279 
    280             Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
    281             while (it.hasNext()) {
    282                 Entry<String, ConsumeStatus> next = it.next();
    283                 String item = String.format("%-32s  %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n",
    284                     next.getKey(),
    285                     next.getValue().getPullRT(),
    286                     next.getValue().getPullTPS(),
    287                     next.getValue().getConsumeRT(),
    288                     next.getValue().getConsumeOKTPS(),
    289                     next.getValue().getConsumeFailedTPS(),
    290                     next.getValue().getConsumeFailedMsgs()
    291                 );
    292 
    293                 sb.append(item);
    294             }
    295         }
    296 
    297         if (this.jstack != null) {
    298             sb.append("
    
    #Consumer jstack#
    ");
    299             sb.append(this.jstack);
    300         }
    301 
    302         return sb.toString();
    303     }
    304 
    305     public String getJstack() {
    306         return jstack;
    307     }
    308 
    309     public void setJstack(String jstack) {
    310         this.jstack = jstack;
    311     }
    312 
    313 }
    ConsumerRunningInfo()
     
  • 相关阅读:
    微信支付Native扫码支付模式二之CodeIgniter集成篇
    如何使用硬盘安装debian8.3?
    使用git将代码push到osc上
    树莓派(Raspberry Pi)搭建简单的lamp服务
    win下修改mysql默认的字符集以防止乱码出现
    CodeIgniter2.2.0-在控制器里调用load失败报错的问题
    Ubuntu Server(Ubuntu 14.04 LTS 64位)安装libgdiplus2.10.9出错问题记录
    linux下mono的安装与卸载
    asp.net中ashx生成验证码代码放在Linux(centos)主机上访问时无法显示问题
    使用NPOI将数据导出为word格式里的table
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14389048.html
Copyright © 2011-2022 走看看