zoukankan      html  css  js  c++  java
  • RocketMQ中Producer的启动源码分析

    RocketMQ中通过DefaultMQProducer创建Producer

    DefaultMQProducer定义如下:

     1 public class DefaultMQProducer extends ClientConfig implements MQProducer {
     2     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
     3     
     4     private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
     5     
     6     private volatile int defaultTopicQueueNums = 4;
     7     
     8     private int sendMsgTimeout = 3000;
     9     
    10     private int compressMsgBodyOverHowmuch = 1024 * 4;
    11     
    12     private int retryTimesWhenSendFailed = 2;
    13     
    14     private int retryTimesWhenSendAsyncFailed = 2;
    15     
    16     private boolean retryAnotherBrokerWhenNotStoreOK = false;
    17     
    18     private int maxMessageSize = 1024 * 1024 * 4; // 4M
    19 }

    其中defaultMQProducerImpl成员是Producer的具体实现,其余的一些成员是对一些参数的设置:
    createTopicKey:是一个Topic值,在创建时使用,后面会说明
    defaultTopicQueueNums :默认的Topic队列个数
    sendMsgTimeout:发送消息超时时间
    compressMsgBodyOverHowmuch:消息容量限制,超过需要进行压缩
    retryTimesWhenSendFailed:同步消息发送失败的允许重发次数
    retryTimesWhenSendAsyncFailed:异步消息发送失败的允许重发次数
    retryAnotherBrokerWhenNotStoreOK:是否允许发送给Broker失败后,重新选择Broker发送
    maxMessageSize:消息最大大小
    这些属性可以通过DefaultMQProducer提供的get、set方法进行相应操作

    常用的构造方法如下:

     1 public DefaultMQProducer() {
     2     this(MixAll.DEFAULT_PRODUCER_GROUP, null);
     3 }
     4 
     5 public DefaultMQProducer(final String producerGroup) {
     6     this(producerGroup, null);
     7 }
     8 
     9 public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
    10     this.producerGroup = producerGroup;
    11     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    12 }


    DefaultMQProducer继承自ClientConfig,首先会设置ClientConfig提供的更底层的参数配置:

     1 public class ClientConfig {
     2     public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
     3     
     4     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
     5     
     6     private String clientIP = RemotingUtil.getLocalAddress();
     7     
     8     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
     9     
    10     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    11     
    12     private int pollNameServerInterval = 1000 * 30;
    13     
    14     private int heartbeatBrokerInterval = 1000 * 30;
    15     
    16     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
    17 }

    其中namesrvAddr是非常重要的成员,其保存着名称服务器(Name Server)的地址,在一开始构造时会根据系统属性进行设置,若是没有设置系统属性就是null,则需要在后面通过set方法进行设置
    clientIP:Producer端的本地IP
    instanceName:Producer的实例名称
    pollNameServerInterval :轮询NameServer的时间间隔
    heartbeatBrokerInterval :向Broker发送心跳包的时间间隔
    SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY 和vipChannelEnabled:决定是否使用VIP通道,即高优先级

    回到DefaultMQProducer的构造方法,其会创建DefaultMQProducerImpl实例

     1 private final Random random = new Random();
     2 private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
     3         new ConcurrentHashMap<String, TopicPublishInfo>();
     4 private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     5 private final RPCHook rpcHook;
     6 protected BlockingQueue<Runnable> checkRequestQueue;
     7 protected ExecutorService checkExecutor;
     8 private ServiceState serviceState = ServiceState.CREATE_JUST;
     9 private MQClientInstance mQClientFactory;
    10 private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    11 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
    12 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    13 private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
    14 private final ExecutorService defaultAsyncSenderExecutor;
    15 private ExecutorService asyncSenderExecutor;
    16 
    17 public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    18     this.defaultMQProducer = defaultMQProducer;
    19     this.rpcHook = rpcHook;
    20 
    21     this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    22     this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
    23         Runtime.getRuntime().availableProcessors(),
    24         Runtime.getRuntime().availableProcessors(),
    25         1000 * 60,
    26         TimeUnit.MILLISECONDS,
    27         this.asyncSenderThreadPoolQueue,
    28         new ThreadFactory() {
    29             private AtomicInteger threadIndex = new AtomicInteger(0);
    30 
    31             @Override
    32             public Thread newThread(Runnable r) {
    33                 return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
    34             }
    35         });
    36 }

    在构造方法中会创建一个线程池,用来处理异步消息的发送
    其中有一个topicPublishInfoTable成员很重要,是一个map,保存了不同top和消息队列之间的映射,在后面详细介绍


    DefaultMQProducer创建完成后,接着来看DefaultMQProducer的start方法:

     1 public void start() throws MQClientException {
     2     this.defaultMQProducerImpl.start();
     3     if (null != traceDispatcher) {
     4         try {
     5             traceDispatcher.start(this.getNamesrvAddr());
     6         } catch (MQClientException e) {
     7             log.warn("trace dispatcher start failed ", e);
     8         }
     9     }
    10 }

    首先交给了defaultMQProducerImpl的start方法去处理

    defaultMQProducerImpl的start方法:

     1 public void start() throws MQClientException {
     2     this.start(true);
     3 }
     4 
     5 public void start(final boolean startFactory) throws MQClientException {
     6     switch (this.serviceState) {
     7         case CREATE_JUST:
     8             this.serviceState = ServiceState.START_FAILED;
     9 
    10             this.checkConfig();
    11 
    12             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    13                 this.defaultMQProducer.changeInstanceNameToPID();
    14             }
    15 
    16             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    17 
    18             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
    19             if (!registerOK) {
    20                 this.serviceState = ServiceState.CREATE_JUST;
    21                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
    22                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    23                     null);
    24             }
    25 
    26             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    27 
    28             if (startFactory) {
    29                 mQClientFactory.start();
    30             }
    31 
    32             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
    33                 this.defaultMQProducer.isSendMessageWithVIPChannel());
    34             this.serviceState = ServiceState.RUNNING;
    35             break;
    36         case RUNNING:
    37         case START_FAILED:
    38         case SHUTDOWN_ALREADY:
    39             throw new MQClientException("The producer service state not OK, maybe started once, "
    40                 + this.serviceState
    41                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
    42                 null);
    43         default:
    44             break;
    45     }
    46 
    47     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    48 }

    在一开始DefaultMQProducerImpl实例化的时候,serviceState初始化为CREATE_JUST状态,这是一个枚举值,一共有如下几种状态:

    1 public enum ServiceState {
    2     CREATE_JUST,
    3     RUNNING,
    4     SHUTDOWN_ALREADY,
    5     START_FAILED;
    6 
    7     private ServiceState() {
    8     }
    9 }

    这几个状态值很容易理解,在后面MQClientInstance中还会使用到

    回到start方法,根据serviceState进行判断,只有当是CREATE_JUST状态时正常执行,防止在其他状态下错误调用start

    直接看到CREATE_JUST的case部分:

     1 this.serviceState = ServiceState.START_FAILED;
     2 
     3 this.checkConfig();
     4 
     5 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
     6 this.defaultMQProducer.changeInstanceNameToPID();
     7 }
     8 
     9 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    10 
    11 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
    12 if (!registerOK) {
    13 this.serviceState = ServiceState.CREATE_JUST;
    14 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
    15 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    16 null);
    17 }
    18 
    19 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    20 
    21 if (startFactory) {
    22 mQClientFactory.start();
    23 }
    24 
    25 log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
    26 this.defaultMQProducer.isSendMessageWithVIPChannel());
    27 this.serviceState = ServiceState.RUNNING;
    28 break;

    首先更改serviceState状态为START_FAILED,防止中途的失败

    checkConfig方法是用来进行ProducerGroup命名检查:

     1 private void checkConfig() throws MQClientException {
     2     Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
     3 
     4     if (null == this.defaultMQProducer.getProducerGroup()) {
     5         throw new MQClientException("producerGroup is null", null);
     6     }
     7 
     8     if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
     9         throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
    10             null);
    11     }
    12 }

    主要是检查命名的合法性,以及防止和默认的producerGroup生产者组名DEFAULT_PRODUCER_GROUP产生冲突

    1 public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";


    接下来实例化mQClientFactory,这其实是生产者客户端的实例,其中MQClientManager采用单例模式,getInstance是获取MQClientManager的单例,根据ClientConfig的类型,通过getAndCreateMQClientInstance方法实例化不同属性的生产者客户端

    MQClientManager:

     1 public class MQClientManager {
     2     private final static InternalLogger log = ClientLogger.getLog();
     3     private static MQClientManager instance = new MQClientManager();
     4     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
     5     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
     6         new ConcurrentHashMap<String, MQClientInstance>();
     7 
     8     private MQClientManager() {
     9     }
    10 
    11     public static MQClientManager getInstance() {
    12         return instance;
    13     }
    14 }

    其中factoryTable是所有生产者客户端实例的map缓存,factoryIndexGenerator 是创建的每个客户端实例的流水号

    getAndCreateMQClientInstance方法:

     1 public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
     2     String clientId = clientConfig.buildMQClientId();
     3     MQClientInstance instance = this.factoryTable.get(clientId);
     4     if (null == instance) {
     5         instance =
     6             new MQClientInstance(clientConfig.cloneClientConfig(),
     7                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
     8         MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
     9         if (prev != null) {
    10             instance = prev;
    11             log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    12         } else {
    13             log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    14         }
    15     }
    16 
    17     return instance;
    18 }

    首先通过buildMQClientId方法创建clientId:

     1 public String buildMQClientId() {
     2     StringBuilder sb = new StringBuilder();
     3     sb.append(this.getClientIP());
     4 
     5     sb.append("@");
     6     sb.append(this.getInstanceName());
     7     if (!UtilAll.isBlank(this.unitName)) {
     8         sb.append("@");
     9         sb.append(this.unitName);
    10     }
    11 
    12     return sb.toString();
    13 }

    clientId主要由生产者客户端的ip地址以及实例名称,根据unitName的有无,附加unitName

    通过生成的clientId,在factoryTable缓存中先去获取是否创建过客户端实例
    若是没有获取到,就需要实例化一个MQClientInstance
    这里在实例化MQClientInstance时,并没有直接传入clientConfig,而是通过cloneClientConfig方法复制了一份,来保证安全性:

     1 public ClientConfig cloneClientConfig() {
     2     ClientConfig cc = new ClientConfig();
     3     cc.namesrvAddr = namesrvAddr;
     4     cc.clientIP = clientIP;
     5     cc.instanceName = instanceName;
     6     cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
     7     cc.pollNameServerInterval = pollNameServerInterval;
     8     cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
     9     cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
    10     cc.unitMode = unitMode;
    11     cc.unitName = unitName;
    12     cc.vipChannelEnabled = vipChannelEnabled;
    13     cc.useTLS = useTLS;
    14     cc.language = language;
    15     return cc;
    16 }


    创建MQClientInstance实例:

     1 public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
     2     this.clientConfig = clientConfig;
     3     this.instanceIndex = instanceIndex;
     4     this.nettyClientConfig = new NettyClientConfig();
     5     this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
     6     this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
     7     this.clientRemotingProcessor = new ClientRemotingProcessor(this);
     8     this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
     9 
    10     if (this.clientConfig.getNamesrvAddr() != null) {
    11         this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
    12         log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    13     }
    14 
    15     this.clientId = clientId;
    16 
    17     this.mQAdminImpl = new MQAdminImpl(this);
    18 
    19     this.pullMessageService = new PullMessageService(this);
    20 
    21     this.rebalanceService = new RebalanceService(this);
    22 
    23     this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    24     this.defaultMQProducer.resetClientConfig(clientConfig);
    25 
    26     this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    27 
    28     log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
    29         this.instanceIndex,
    30         this.clientId,
    31         this.clientConfig,
    32         MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    33 }

    可以看到MQClientInstance的构造方法创建了很多东西,就不一一说明,主要说几个重要的
    其中nettyClientConfig,就很清楚的说明了RocketMQ通过Netty来进行网络之间的I/O,其保存了对Netty的一些配置
    clientRemotingProcessor,用来进行消息的处理

    mQClientAPIImpl则是一个非常重要的部分,直接实例化了一个MQClientAPIImpl对象:

     1 public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
     2     final ClientRemotingProcessor clientRemotingProcessor,
     3     RPCHook rpcHook, final ClientConfig clientConfig) {
     4     this.clientConfig = clientConfig;
     5     topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
     6     this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
     7     this.clientRemotingProcessor = clientRemotingProcessor;
     8     
     9     this.remotingClient.registerRPCHook(rpcHook);
    10     this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
    11     
    12     this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
    13     
    14     this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
    15     
    16     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
    17     
    18     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
    19     
    20     this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
    21 }

    可以看到在这个构造方法里,首先创建了一个TopAddressing,用于以后的名称服务的寻址,其默认地址是:

    1 http://jmenv.tbsite.net:8080/rocketmq/nsaddr

    需要通过系统属性来完成更改

    接着创建了一个NettyRemotingClient,这个就是实实在在的Netty客户端

     1 private final Bootstrap bootstrap = new Bootstrap();
     2 // 名称服务列表
     3 private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
     4 
     5 public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
     6     final ChannelEventListener channelEventListener) {
     7     super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
     8     this.nettyClientConfig = nettyClientConfig;
     9     this.channelEventListener = channelEventListener;
    10 
    11     int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    12     if (publicThreadNums <= 0) {
    13         publicThreadNums = 4;
    14     }
    15 
    16     this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
    17         private AtomicInteger threadIndex = new AtomicInteger(0);
    18 
    19         @Override
    20         public Thread newThread(Runnable r) {
    21             return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
    22         }
    23     });
    24 
    25     this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
    26         private AtomicInteger threadIndex = new AtomicInteger(0);
    27 
    28         @Override
    29         public Thread newThread(Runnable r) {
    30             return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
    31         }
    32     });
    33 
    34     if (nettyClientConfig.isUseTLS()) {
    35         try {
    36             sslContext = TlsHelper.buildSslContext(true);
    37             log.info("SSL enabled for client");
    38         } catch (IOException e) {
    39             log.error("Failed to create SSLContext", e);
    40         } catch (CertificateException e) {
    41             log.error("Failed to create SSLContext", e);
    42             throw new RuntimeException("Failed to create SSLContext", e);
    43         }
    44     }
    45 }

    此时Netty的客户端仅仅完成了对Bootstrap的初始化,以及对NioEventLoopGroup的设置和初始化

    回到MQClientInstance的构造方法,在完成MQClientAPIImpl的创建后,会根据clientConfig的getNamesrvAddr判断是否设置了namesrvAddr名称服务地址,若是设置了,需要通过mQClientAPIImpl的updateNameServerAddressList方法,完成对名称服务地址的更新:

    MQClientAPIImpl的updateNameServerAddressList方法:

    1 public void updateNameServerAddressList(final String addrs) {
    2     String[] addrArray = addrs.split(";");
    3     List<String> list = Arrays.asList(addrArray);
    4     this.remotingClient.updateNameServerAddressList(list);
    5 }

    由于名称服务可以是集群的方式,所以在这里用“;”进行分割,得到所有的名称服务地址,再由remotingClient进行更新,而此时的remotingClient也就是刚才创建的NettyRemotingClient
    NettyRemotingClient的updateNameServerAddressList方法:

     1 public void updateNameServerAddressList(List<String> addrs) {
     2     List<String> old = this.namesrvAddrList.get();
     3     boolean update = false;
     4 
     5     if (!addrs.isEmpty()) {
     6         if (null == old) {
     7             update = true;
     8         } else if (addrs.size() != old.size()) {
     9             update = true;
    10         } else {
    11             for (int i = 0; i < addrs.size() && !update; i++) {
    12                 if (!old.contains(addrs.get(i))) {
    13                     update = true;
    14                 }
    15             }
    16         }
    17 
    18         if (update) {
    19             Collections.shuffle(addrs);
    20             log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
    21             this.namesrvAddrList.set(addrs);
    22         }
    23     }
    24 }

    这里逻辑比较简单,完成了名称服务列表的更新

    回到MQClientInstance的构造方法,做完以上操作后,又在后面创建了MQAdminImpl、PullMessageService、RebalanceService、ConsumerStatsManager以及一个新的DefaultMQProducer,关于这几个在后面出现时再介绍

    回到MQClientManager的getAndCreateMQClientInstance方法,在完成MQClientInstance的创建后,将其放入缓存中

    再回到DefaultMQProducerImpl的start方法,在创建完MQClientInstance后,调用registerProducer方法
    MQClientInstance的registerProducer方法:

     1 public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
     2     if (null == group || null == producer) {
     3         return false;
     4     }
     5 
     6     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
     7     if (prev != null) {
     8         log.warn("the producer group[{}] exist already.", group);
     9         return false;
    10     }
    11 
    12     return true;
    13 }

    在MQClientInstance初始化时,会创建producerTable 、consumerTable 、topicRouteTable 、brokerAddrTable 这几个比较重要的map

    1 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    3 private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    4 private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
    5         new ConcurrentHashMap<String, HashMap<Long, String>>();

    其中MQProducerInner是接口,DefaultMQProducerImpl是其实现类,完成了以group组名称为键值的DefaultMQProducerImpl的关联
    在这里就是根据group,进行DefaultMQProducerImpl的缓存,MQConsumerInner同理
    topicRouteTable 则记录与Topic对应的Broker以及消息队列信息
    brokerAddrTable则记录与Broker Name对应的Broker的地址列表

    还是回到start方法,在完成registerProducer方法后,根据返回值registerOK,判断接下来的操作
    若是失败,将serviceState置为CREATE_JUST,并报出异常,方便下一次的正常start

    若是成功,则先需要向topicPublishInfoTable中添加一条键值为createTopicKey("TBW102")的TopicPublishInfo记录
    TopicPublishInfo:

    1 public class TopicPublishInfo {
    2     private boolean orderTopic = false;
    3     private boolean haveTopicRouterInfo = false;
    4     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    5     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    6 }

    其中messageQueueList存放消息队列MessageQueue,sendWhichQueue 是用来获取sendWhichQueue中的下标,也就是当前所要发送的具体的消息队列

    MessageQueue:

     1 public class MessageQueue implements Comparable<MessageQueue>, Serializable {
     2     private static final long serialVersionUID = 6191200464116433425L;
     3     private String topic;
     4     private String brokerName;
     5     private int queueId;
     6 
     7     public MessageQueue() {
     8     }
     9 
    10     public MessageQueue(String topic, String brokerName, int queueId) {
    11         this.topic = topic;
    12         this.brokerName = brokerName;
    13         this.queueId = queueId;
    14     }
    15 }

    可以看到这是一个简单的pojo,其封装了topic,brokerName以及queueId

    ThreadLocalIndex :

     1 public class ThreadLocalIndex {
     2     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
     3     private final Random random = new Random();
     4 
     5     public int getAndIncrement() {
     6         Integer index = this.threadLocalIndex.get();
     7         if (null == index) {
     8             index = Math.abs(random.nextInt());
     9             if (index < 0)
    10                 index = 0;
    11             this.threadLocalIndex.set(index);
    12         }
    13 
    14         index = Math.abs(index + 1);
    15         if (index < 0)
    16             index = 0;
    17 
    18         this.threadLocalIndex.set(index);
    19         return index;
    20     }
    21 
    22     @Override
    23     public String toString() {
    24         return "ThreadLocalIndex{" +
    25             "threadLocalIndex=" + threadLocalIndex.get() +
    26             '}';
    27     }
    28 }

    通过ThreadLocal,赋予每个线程一个随机值,后面会根据这个随机值通过和messageQueueList的length取余运算,选取一个MessageQueue ,进而选取一条真正的消息队列进行消息发送

    再次回到DefaultMQProducerImpl的start方法,在完成createTopicKey的Topic的记录添加后,根据startFactory判断是否需要调用mQClientFactory的start方法,这里默认startFactory是true,就需要调用mQClientFactory的start方法:

    MQClientInstance的start方法:

     1 public void start() throws MQClientException {
     2     synchronized (this) {
     3         switch (this.serviceState) {
     4             case CREATE_JUST:
     5                 this.serviceState = ServiceState.START_FAILED;
     6                 // If not specified,looking address from name server
     7                 if (null == this.clientConfig.getNamesrvAddr()) {
     8                     this.mQClientAPIImpl.fetchNameServerAddr();
     9                 }
    10                 // Start request-response channel
    11                 this.mQClientAPIImpl.start();
    12                 // Start various schedule tasks
    13                 this.startScheduledTask();
    14                 // Start pull service
    15                 this.pullMessageService.start();
    16                 // Start rebalance service
    17                 this.rebalanceService.start();
    18                 // Start push service
    19                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    20                 log.info("the client factory [{}] start OK", this.clientId);
    21                 this.serviceState = ServiceState.RUNNING;
    22                 break;
    23             case RUNNING:
    24                 break;
    25             case SHUTDOWN_ALREADY:
    26                 break;
    27             case START_FAILED:
    28                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    29             default:
    30                 break;
    31         }
    32     }
    33 }

    MQClientInstance在创建时其serviceState状态也是CREATE_JUST

    这里首先检查名称服务地址是否设置,若是没有设置,则通过MQClientAPIImpl的fetchNameServerAddr方法,尝试自动获取名称服务
    MQClientAPIImpl的fetchNameServerAddr方法:

     1 public String fetchNameServerAddr() {
     2     try {
     3         String addrs = this.topAddressing.fetchNSAddr();
     4         if (addrs != null) {
     5             if (!addrs.equals(this.nameSrvAddr)) {
     6                 log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
     7                 this.updateNameServerAddressList(addrs);
     8                 this.nameSrvAddr = addrs;
     9                 return nameSrvAddr;
    10             }
    11         }
    12     } catch (Exception e) {
    13         log.error("fetchNameServerAddr Exception", e);
    14     }
    15     return nameSrvAddr;
    16 }

    这里首先根据topAddressing的fetchNSAddr方法获取名称服务地址,若是获取到了,则判断是否需要更新名称服务列表以及原来的nameSrvAddr

    topAddressing在前面说过,MQClientAPIImpl构造方法中,创建TopAddressing实例
    TopAddressing的fetchNSAddr方法:

     1 public final String fetchNSAddr() {
     2     return fetchNSAddr(true, 3000);
     3 }
     4 
     5 public final String fetchNSAddr(boolean verbose, long timeoutMills) {
     6     String url = this.wsAddr;
     7     try {
     8         if (!UtilAll.isBlank(this.unitName)) {
     9             url = url + "-" + this.unitName + "?nofix=1";
    10         }
    11         HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
    12         if (200 == result.code) {
    13             String responseStr = result.content;
    14             if (responseStr != null) {
    15                 return clearNewLine(responseStr);
    16             } else {
    17                 log.error("fetch nameserver address is null");
    18             }
    19         } else {
    20             log.error("fetch nameserver address failed. statusCode=" + result.code);
    21         }
    22     } catch (IOException e) {
    23         if (verbose) {
    24             log.error("fetch name server address exception", e);
    25         }
    26     }
    27 
    28     if (verbose) {
    29         String errorMsg =
    30             "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";
    31         errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
    32 
    33         log.warn(errorMsg);
    34     }
    35     return null;
    36 }

    首先根据wsAddr和unitName创建url,其中wsAddr在前面说过,默认是http://jmenv.tbsite.net:8080/rocketmq/nsaddr,需要通过系统属性来更改

    然后通过HttpTinyClient的httpGet方法建立连接,进行GET请求,获取名称地址
    HttpTinyClient的httpGet方法:

     1 static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
     2     String encoding, long readTimeoutMs) throws IOException {
     3     String encodedContent = encodingParams(paramValues, encoding);
     4     url += (null == encodedContent) ? "" : ("?" + encodedContent);
     5 
     6     HttpURLConnection conn = null;
     7     try {
     8         conn = (HttpURLConnection) new URL(url).openConnection();
     9         conn.setRequestMethod("GET");
    10         conn.setConnectTimeout((int) readTimeoutMs);
    11         conn.setReadTimeout((int) readTimeoutMs);
    12         setHeaders(conn, headers, encoding);
    13 
    14         conn.connect();
    15         int respCode = conn.getResponseCode();
    16         String resp = null;
    17 
    18         if (HttpURLConnection.HTTP_OK == respCode) {
    19             resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
    20         } else {
    21             resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
    22         }
    23         return new HttpResult(respCode, resp);
    24     } finally {
    25         if (conn != null) {
    26             conn.disconnect();
    27         }
    28     }
    29 }

    这里就通过了JDK原生的HttpURLConnection ,完成了一次指定url的GET请求,返回请求数据,将请求到的数据以及状态码封装为HttpResult,返回给上一级调用,也就是TopAddressing的fetchNSAddr方法中,再调用clearNewLine方法,将状态码为200的数据处理(清除不必要的空客、换行、回车),得到名称地址,最后回到fetchNameServerAddr方法中,完成名称服务列表的更新,至此自动获取名称服务结束

    回到MQClientInstance的start方法中:
    在确定有名称服务的情况下,首先调用mQClientAPIImpl的start方法:
    MQClientAPIImpl的start方法:

    1 public void start() {
    2     this.remotingClient.start();
    3 }

    这里实际上调用了前面所创建的Nettt客户端的start方法:
    NettyRemotingClient的start方法:

     1 public void start() {
     2     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
     3         nettyClientConfig.getClientWorkerThreads(),
     4         new ThreadFactory() {
     5 
     6             private AtomicInteger threadIndex = new AtomicInteger(0);
     7 
     8             @Override
     9             public Thread newThread(Runnable r) {
    10                 return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
    11             }
    12         });
    13 
    14     Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
    15         .option(ChannelOption.TCP_NODELAY, true)
    16         .option(ChannelOption.SO_KEEPALIVE, false)
    17         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    18         .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
    19         .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
    20         .handler(new ChannelInitializer<SocketChannel>() {
    21             @Override
    22             public void initChannel(SocketChannel ch) throws Exception {
    23                 ChannelPipeline pipeline = ch.pipeline();
    24                 if (nettyClientConfig.isUseTLS()) {
    25                     if (null != sslContext) {
    26                         pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
    27                         log.info("Prepend SSL handler");
    28                     } else {
    29                         log.warn("Connections are insecure as SSLContext is null!");
    30                     }
    31                 }
    32                 pipeline.addLast(
    33                     defaultEventExecutorGroup,
    34                     new NettyEncoder(),
    35                     new NettyDecoder(),
    36                     new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
    37                     new NettyConnectManageHandler(),
    38                     new NettyClientHandler());
    39             }
    40         });
    41 
    42     this.timer.scheduleAtFixedRate(new TimerTask() {
    43         @Override
    44         public void run() {
    45             try {
    46                 NettyRemotingClient.this.scanResponseTable();
    47             } catch (Throwable e) {
    48                 log.error("scanResponseTable exception", e);
    49             }
    50         }
    51     }, 1000 * 3, 1000);
    52 
    53     if (this.channelEventListener != null) {
    54         this.nettyEventExecutor.start();
    55     }
    56 }

    这里完成了Bootstrap对前面创建的EventLoopGroup以及handler的绑定


    在完成mQClientAPIImpl的start方法后,调用startScheduledTask方法,启动定时任务
    startScheduledTask方法:

     1 private void startScheduledTask() {
     2     if (null == this.clientConfig.getNamesrvAddr()) {
     3         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     4 
     5             @Override
     6             public void run() {
     7                 try {
     8                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
     9                 } catch (Exception e) {
    10                     log.error("ScheduledTask fetchNameServerAddr exception", e);
    11                 }
    12             }
    13         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    14     }
    15 
    16     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    17 
    18         @Override
    19         public void run() {
    20             try {
    21                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    22             } catch (Exception e) {
    23                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
    24             }
    25         }
    26     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    27 
    28     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    29 
    30         @Override
    31         public void run() {
    32             try {
    33                 MQClientInstance.this.cleanOfflineBroker();
    34                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    35             } catch (Exception e) {
    36                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    37             }
    38         }
    39     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    40 
    41     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    42 
    43         @Override
    44         public void run() {
    45             try {
    46                 MQClientInstance.this.persistAllConsumerOffset();
    47             } catch (Exception e) {
    48                 log.error("ScheduledTask persistAllConsumerOffset exception", e);
    49             }
    50         }
    51     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    52 
    53     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    54 
    55         @Override
    56         public void run() {
    57             try {
    58                 MQClientInstance.this.adjustThreadPool();
    59             } catch (Exception e) {
    60                 log.error("ScheduledTask adjustThreadPool exception", e);
    61             }
    62         }
    63     }, 1, 1, TimeUnit.MINUTES);
    64 }

    可以看到,一共设置了五个定时任务

    ①若是名称服务地址namesrvAddr不存在,则调用前面的fetchNameServerAddr方法,定时更新名称服务

    ②通过updateTopicRouteInfoFromNameServer方法定时更新Topic所对应的路由信息:

     1 public void updateTopicRouteInfoFromNameServer() {
     2     Set<String> topicList = new HashSet<String>();
     3 
     4     // Consumer
     5     {
     6         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
     7         while (it.hasNext()) {
     8             Entry<String, MQConsumerInner> entry = it.next();
     9             MQConsumerInner impl = entry.getValue();
    10             if (impl != null) {
    11                 Set<SubscriptionData> subList = impl.subscriptions();
    12                 if (subList != null) {
    13                     for (SubscriptionData subData : subList) {
    14                         topicList.add(subData.getTopic());
    15                     }
    16                 }
    17             }
    18         }
    19     }
    20 
    21     // Producer
    22     {
    23         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    24         while (it.hasNext()) {
    25             Entry<String, MQProducerInner> entry = it.next();
    26             MQProducerInner impl = entry.getValue();
    27             if (impl != null) {
    28                 Set<String> lst = impl.getPublishTopicList();
    29                 topicList.addAll(lst);
    30             }
    31         }
    32     }
    33 
    34     for (String topic : topicList) {
    35         this.updateTopicRouteInfoFromNameServer(topic);
    36     }
    37 }

    将所有Consumer和Producer的Topic封装在topicList,交给updateTopicRouteInfoFromNameServer调用

    updateTopicRouteInfoFromNameServer方法:

     1 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
     2     return updateTopicRouteInfoFromNameServer(topic, false, null);
     3 }
     4 
     5 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
     6         DefaultMQProducer defaultMQProducer) {
     7     try {
     8         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
     9             try {
    10                 TopicRouteData topicRouteData;
    11                 if (isDefault && defaultMQProducer != null) {
    12                     topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
    13                         1000 * 3);
    14                     if (topicRouteData != null) {
    15                         for (QueueData data : topicRouteData.getQueueDatas()) {
    16                             int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
    17                             data.setReadQueueNums(queueNums);
    18                             data.setWriteQueueNums(queueNums);
    19                         }
    20                     }
    21                 } else {
    22                     topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    23                 }
    24                 if (topicRouteData != null) {
    25                     TopicRouteData old = this.topicRouteTable.get(topic);
    26                     boolean changed = topicRouteDataIsChange(old, topicRouteData);
    27                     if (!changed) {
    28                         changed = this.isNeedUpdateTopicRouteInfo(topic);
    29                     } else {
    30                         log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    31                     }
    32 
    33                     if (changed) {
    34                         TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    35 
    36                         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
    37                             this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    38                         }
    39 
    40                         // Update Pub info
    41                         {
    42                             TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    43                             publishInfo.setHaveTopicRouterInfo(true);
    44                             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    45                             while (it.hasNext()) {
    46                                 Entry<String, MQProducerInner> entry = it.next();
    47                                 MQProducerInner impl = entry.getValue();
    48                                 if (impl != null) {
    49                                     impl.updateTopicPublishInfo(topic, publishInfo);
    50                                 }
    51                             }
    52                         }
    53 
    54                         // Update sub info
    55                         {
    56                             Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
    57                             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    58                             while (it.hasNext()) {
    59                                 Entry<String, MQConsumerInner> entry = it.next();
    60                                 MQConsumerInner impl = entry.getValue();
    61                                 if (impl != null) {
    62                                     impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    63                                 }
    64                             }
    65                         }
    66                         log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    67                         this.topicRouteTable.put(topic, cloneTopicRouteData);
    68                         return true;
    69                     }
    70                 } else {
    71                     log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
    72                 }
    73             } catch (Exception e) {
    74                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
    75                     log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    76                 }
    77             } finally {
    78                 this.lockNamesrv.unlock();
    79             }
    80         } else {
    81             log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
    82         }
    83     } catch (InterruptedException e) {
    84         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    85     }
    86 
    87     return false;
    88 }

    这里首先由mQClientAPIImpl的getTopicRouteInfoFromNameServer方法,从名称服务器上获取其Topic所对应的路由信息

    其中Topic的路由信息由TopicRouteData进行封装:

    1 public class TopicRouteData extends RemotingSerializable {
    2     private String orderTopicConf;
    3     private List<QueueData> queueDatas;
    4     private List<BrokerData> brokerDatas;
    5     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    6 }

    QueueData:

    1 public class QueueData implements Comparable<QueueData> {
    2     private String brokerName;
    3     private int readQueueNums;
    4     private int writeQueueNums;
    5     private int perm;
    6     private int topicSynFlag;
    7 }

    BrokerData:

    1 public class BrokerData implements Comparable<BrokerData> {
    2     private String cluster;
    3     private String brokerName;
    4     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    5 }


    getTopicRouteInfoFromNameServer方法:

     1 public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
     2         throws RemotingException, MQClientException, InterruptedException {
     3     return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
     4 }
     5 
     6     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
     7         boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
     8     GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
     9     requestHeader.setTopic(topic);
    10 
    11     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
    12 
    13     RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    14     assert response != null;
    15     switch (response.getCode()) {
    16         case ResponseCode.TOPIC_NOT_EXIST: {
    17             if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
    18                 log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
    19             }
    20 
    21             break;
    22         }
    23         case ResponseCode.SUCCESS: {
    24             byte[] body = response.getBody();
    25             if (body != null) {
    26                 return TopicRouteData.decode(body, TopicRouteData.class);
    27             }
    28         }
    29         default:
    30             break;
    31     }
    32 
    33     throw new MQClientException(response.getCode(), response.getRemark());
    34 }

    这里主要通过remotingClient即Netty客户端的invokeSync方法向名称服务器发送封装好的request请求来获取response
    通过名称服务器寻找与Topic相关的Broker有关路由信息,将这些信息作为response返回,在这里接收到进行处理,封装成TopicRouteData

    在invokeSync方法中采用懒加载的方式,尝试获取已经建立好连接的Channel,若是没有,则需要通过bootstrap的connect方法先建立连接产生ChannelFuture,进而获取并缓存Channel

    回到updateTopicRouteInfoFromNameServer,通过名称服务器获取到了有关Topic的路由信息,调用topicRouteDataIsChange方法和原来topicRouteTable保存的路由信息进行比较
    topicRouteDataIsChange方法:

     1 private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
     2     if (olddata == null || nowdata == null)
     3         return true;
     4     TopicRouteData old = olddata.cloneTopicRouteData();
     5     TopicRouteData now = nowdata.cloneTopicRouteData();
     6     Collections.sort(old.getQueueDatas());
     7     Collections.sort(old.getBrokerDatas());
     8     Collections.sort(now.getQueueDatas());
     9     Collections.sort(now.getBrokerDatas());
    10     return !old.equals(now);
    11 }

    若是没有发生改变,任然要调用isNeedUpdateTopicRouteInfo方法检查是否有需要更新

    isNeedUpdateTopicRouteInfo方法:

     1 private boolean isNeedUpdateTopicRouteInfo(final String topic) {
     2     boolean result = false;
     3     {
     4         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
     5         while (it.hasNext() && !result) {
     6             Entry<String, MQProducerInner> entry = it.next();
     7             MQProducerInner impl = entry.getValue();
     8             if (impl != null) {
     9                 result = impl.isPublishTopicNeedUpdate(topic);
    10             }
    11         }
    12     }
    13 
    14     {
    15         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    16         while (it.hasNext() && !result) {
    17             Entry<String, MQConsumerInner> entry = it.next();
    18             MQConsumerInner impl = entry.getValue();
    19             if (impl != null) {
    20                 result = impl.isSubscribeTopicNeedUpdate(topic);
    21             }
    22         }
    23     }
    24 
    25     return result;
    26 }

    分别对所有的消费者和生产者进行检查是否有需要更新有关该Topic的路由信息

    当存在需要跟新的情况时,在updateTopicRouteInfoFromNameServer中
    首先从topicRouteData中取出BrokerData,即Broker的路由信息,进行更新
    再根据topicRouteData从中获取消费者生产者的消息路由信息,分别进行更新

    ③定时清除离线的Broker,以及向当前在线的Broker发送心跳包
    cleanOfflineBroker清除离线的Broker:

     1 private void cleanOfflineBroker() {
     2     try {
     3         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
     4             try {
     5                 ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
     6 
     7                 Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
     8                 while (itBrokerTable.hasNext()) {
     9                     Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
    10                     String brokerName = entry.getKey();
    11                     HashMap<Long, String> oneTable = entry.getValue();
    12 
    13                     HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
    14                     cloneAddrTable.putAll(oneTable);
    15 
    16                     Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
    17                     while (it.hasNext()) {
    18                         Entry<Long, String> ee = it.next();
    19                         String addr = ee.getValue();
    20                         if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
    21                             it.remove();
    22                             log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
    23                         }
    24                     }
    25 
    26                     if (cloneAddrTable.isEmpty()) {
    27                         itBrokerTable.remove();
    28                         log.info("the broker[{}] name's host is offline, remove it", brokerName);
    29                     } else {
    30                         updatedTable.put(brokerName, cloneAddrTable);
    31                     }
    32                 }
    33 
    34                 if (!updatedTable.isEmpty()) {
    35                     this.brokerAddrTable.putAll(updatedTable);
    36                 }
    37             } finally {
    38                 this.lockNamesrv.unlock();
    39             }
    40     } catch (InterruptedException e) {
    41         log.warn("cleanOfflineBroker Exception", e);
    42     }
    43 }

    这里的brokerAddrTable是会通过②中的定时任务来更新,遍历其中的所有Broker信息,通过isBrokerAddrExistInTopicRouteTable方法,进行检查:

     1 private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
     2     Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
     3     while (it.hasNext()) {
     4         Entry<String, TopicRouteData> entry = it.next();
     5         TopicRouteData topicRouteData = entry.getValue();
     6         List<BrokerData> bds = topicRouteData.getBrokerDatas();
     7         for (BrokerData bd : bds) {
     8             if (bd.getBrokerAddrs() != null) {
     9                 boolean exist = bd.getBrokerAddrs().containsValue(addr);
    10                 if (exist)
    11                     return true;
    12             }
    13         }
    14     }
    15 
    16     return false;
    17 }

    通过比对topicRouteTable中的所有TopicRouteData保存的BrokerAddrs来判断,若是Broker不存在,需要进行清除,进而更新brokerAddrTable

    sendHeartbeatToAllBrokerWithLock定时向Broker发送心跳包:

     1 public void sendHeartbeatToAllBrokerWithLock() {
     2     if (this.lockHeartbeat.tryLock()) {
     3         try {
     4             this.sendHeartbeatToAllBroker();
     5             this.uploadFilterClassSource();
     6         } catch (final Exception e) {
     7             log.error("sendHeartbeatToAllBroker exception", e);
     8         } finally {
     9             this.lockHeartbeat.unlock();
    10         }
    11     } else {
    12         log.warn("lock heartBeat, but failed.");
    13     }
    14 }

    这一部分就不详细介绍,主要还是通过Netty客户端完成心跳包的发送

    ④定时持久化消费者队列的消费进度,这个在分析消费者时再详细说明

    ⑤定时调整消费者端的线程池的大小,还是在分析消费者时再详细说明

    startScheduledTask创建的五个定时任务结束,回到MQClientInstance的start方法
    接着开启pullMessageService服务,为消费者拉取消息
    然后开启rebalanceService服务,用来均衡消息队列
    这两个服务在有关消费者时再介绍

    接着通过:

    1 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

    开启push service服务
    其中defaultMQProducer是在前面MQClientInstance构造方法中创建的

    1 this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);

    只不过他调用的start方法,参数为false,也就是没有调用mQClientFactory的start方法
    后续会介绍其用途

    到这DefaultMQProducerImpl的start方法已经基本完毕,只不过在最后,会通过mQClientFactory的sendHeartbeatToAllBrokerWithLock方法,给所有Broker发送一次心跳包

    到此,Producer的启动结束

  • 相关阅读:
    ASP.NET MVC4.0+ WebAPI+EasyUI+KnockOutJS快速开发框架 通用权限管理系统
    74.Java异常处理机制
    emmm
    数据库关系代数
    汇编实验二 2进制转16进制
    汇编实验一 显示字符串
    JustOj 1386: 众数的数量
    Codeforces 124A
    Codeforces 456A
    Codeforces 237A
  • 原文地址:https://www.cnblogs.com/a526583280/p/11273769.html
Copyright © 2011-2022 走看看