zoukankan      html  css  js  c++  java
  • RocketMQ源码分析之Producer启动(四)

    我们主要分析图中红色矩形区域,最上面RemotingClient属于RocketMQ-remoting内容,以后再分析。从图中也可以清晰的看到RocketMQ底层通信协议使用了Netty(netty-all-4.0.42.Final版本)。

    言归正传,红色矩形局域里面大家看调用关系就可以看出来,MQClientInstance是非常核心的一个类,它起着承上启下的衔接作用。MQClientInstance是Producer与MQ交互的实例,一个client只有一个MQClientInstance实例。它包含了topic路由,broker地址等信息,同时负责启动底层通信服务(MQClientAPIImpl)、定时任务服务(ScheduledExecutorService)、拉取消息服务(PullMessageService),负载均衡服务(RebalanceService)等等。

    下面我们通过一个Producer启动时序图,看看完整的调用过程。

     

    1、MyProducer是我们自己的生产者,主要代码如下:

    1 MQProducer producer = new DefaultMQProducer("MyGroup");//创建一个默认生产者
    2 producer.setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");//设置NameServer地址,多个地址分号分隔
    3 producer.start();//启动
    4 
    5 ......
    6 
    7 producer.shutdown();//关闭

    new一个DefaultMQProducer对象,设置注册中心地址,然后启动。

    DefaultMQProducer构造器,从DefaultMQProducer构造器可以看出Producer的主要业务逻辑是在DefaultMQProducerImpl中完成的。

    1 public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
    2     this.producerGroup = producerGroup;
    3     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    4 }
    NamesrvAddr赋值有三种方式:
    • 通过setNamesrvAddr方法设置
    • 通过系统环境变量设置
    • 通过fetchNameServerAddr()方法调用http接口去寻址,需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址。

    启动时查找NamesrvAddr的优先级:setNamesrvAddr()-->环境变量-->fetchNameServerAddr()


    ClientConfig类中有这样一行代码,对namesrvAddr进行了初始化,可以看到默认值确实是从系统环境变量里取的。
    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));

    2、DefaultMQProducer类

    DefaultMQProducer构造器我们上面讲过了,就是给producerGroup赋值,并创建具体实现类DefaultMQProducerImpl对象。
    DefaultMQProducer.start()方法:start方法直接调用了DefaultMQProducerImpl的start()方法。
    1     @Override
    2     public void start() throws MQClientException {
    3         this.defaultMQProducerImpl.start();
    4     }

    3、DefaultMQProducerImpl类(重点),省略了异常,日志输出代码。

     1     public void start(final boolean startFactory) throws MQClientException {
     2         switch (this.serviceState) {
     3             case CREATE_JUST:
     4                 this.serviceState = ServiceState.START_FAILED;
     5                 //检查producerGroup是否合法
     6                 this.checkConfig();
     7 
     8                 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
     9                     this.defaultMQProducer.changeInstanceNameToPID();
    10                 }
    11 
    12                 //获取MQClientInstance实例,注意单例模式,一个JVM内只能有一个MQClientInstance实例。
    13                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    14                 //注册Producer
    15                 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
    16                 if (!registerOK) {
    17                     this.serviceState = ServiceState.CREATE_JUST;
    18                     throw new MQClientException();
    19                 }
    20 
    21                 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    22 
    23                 if (startFactory) {
    24                     //调用MQClientInstance start方法
    25                     mQClientFactory.start();
    26                 }
    27 
    28                 this.serviceState = ServiceState.RUNNING;
    29                 break;
    30             case RUNNING:
    31             case START_FAILED:
    32             case SHUTDOWN_ALREADY:
    33                 throw new MQClientException()
    34             default:
    35                 break;
    36         }
    37         //向所有Broker发送心跳
    38         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    39     }

    第六行checkConfig方法检查producerGroup是否合法,主要有以下几个检查点:

    • groupName不能为空
    • 非法字符检查
    • 最大长度不超过255字节
    • 自定义的groupName不能定义为"DEFAULT_PRODUCER",因为DEFAULT_PRODUCER是RocketMQ默认的分组名称

    MQClientManager.getAndCreateMQClientInstance方法代码:

     1     public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
     2         //生成clientId,规则:{ip}@{instanceName}
     3         String clientId = clientConfig.buildMQClientId();
     4         MQClientInstance instance = this.factoryTable.get(clientId);
     5         //如果instance存在直接返回,否则创建一个新的MQClientInstance对象。
     6         if (null == instance) {
     7             instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
     8             //如果传入的key存在,就返回已存在的value
     9             MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
    10             if (prev != null) {
    11                 instance = prev;
    12                 log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    13             } else {
    14                 log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    15             }
    16         }
    17 
    18         return instance;
    19     }

    getAndCreateMQClientInstance()中心思想就是要返回一个MQClientInstance对象。方法里使用了双重判断,这么没什么好说的。

    注意ConcurrentHashMap的putIfAbsent用法,有3年以上编程经验的人必须要知道。

    4、MQClientInstance.registerProducer()方法

    这个方法名起得有点误导人,让人以为是要把Producer注册到NameServer上。其实只是把当前的Producer放到ConcurrentHashMap中,key是groupName,value是producer对象。代码如下:

     1     public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
     2         if (null == group || null == producer) {
     3             return false;
     4         }
     5 
     6         MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
     7         if (prev != null) {
     8             log.warn("the producer group[{}] exist already.", group);
     9             return false;
    10         }
    11 
    12         return true;
    13     }

    5、MQClientInstance.start()方法,贴出来的代码省略了switch

     1 public void start() throws MQClientException {
     2     synchronized (this) {
     3         this.serviceState = ServiceState.START_FAILED;
     4         // 如果没有namesrvAddr则去查找,fetchNameServerAddr方法下面再详细说
     5         if (null == this.clientConfig.getNamesrvAddr()) {
     6             this.mQClientAPIImpl.fetchNameServerAddr();
     7         }
     8         // 启动请求相应通道,打开channel
     9         this.mQClientAPIImpl.start();
    10         // 启动定时任务
    11         this.startScheduledTask();
    12         // 启动拉取消息服务
    13         this.pullMessageService.start();
    14         // 启动负载均衡服务
    15         this.rebalanceService.start();
    16         // 启动消息推送服务,注意这里又回去调用了DefaultMQProducerImpl的start方法,但是参数是false。
    17         this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    18         this.serviceState = ServiceState.RUNNING;
    19     }
    20 }

    6、MQClientAPIImpl.fetchNameServerAddr()方法

    从下面的源码可以清楚的看到寻址方法最后调用调用http接口去寻址,前提需配置hosts信息,客户端默认每隔两分钟去访问一次这个http地址,并更新本地namesrvAddr地址。

     1     public String fetchNameServerAddr() {
     2         try {
     3             String addrs = this.topAddressing.fetchNSAddr();
     4             if (addrs != null) {
     5                 if (!addrs.equals(this.nameSrvAddr)) {
     6                     this.updateNameServerAddressList(addrs);
     7                     this.nameSrvAddr = addrs;
     8                     return nameSrvAddr;
     9                 }
    10             }
    11         } catch (Exception e) {
    12         }
    13         return nameSrvAddr;
    14     }
    15 
    16     public final String fetchNSAddr() {
    17         return fetchNSAddr(true, 3000);
    18     }
    19 
    20     public final String fetchNSAddr(boolean verbose, long timeoutMills) {
    21         String url = this.wsAddr;
    22         if (!UtilAll.isBlank(this.unitName)) {
    23             url = url + "-" + this.unitName + "?nofix=1";
    24         }
    25         HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
    26         if (200 == result.code) {
    27             String responseStr = result.content;
    28             if (responseStr != null) {
    29                 return clearNewLine(responseStr);
    30             } else {
    31                 log.error("fetch nameserver address is null");
    32             }
    33         } else {
    34             log.error("fetch nameserver address failed. statusCode=" + result.code);
    35         }
    36         return null;
    37     }

    7、MQClientAPIImpl.start()方法

    建立底层通信channel,MQClientAPIImpl.start方法最后调用RemotingClient的start方法,而remotingClient调用了netty建立底层通信。

     1     public void start() {
     2         this.remotingClient.start();
     3     }
     4 
     5     @Override
     6     public void start() {
     7         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
     8             nettyClientConfig.getClientWorkerThreads(),
     9             new ThreadFactory() {
    10                 private AtomicInteger threadIndex = new AtomicInteger(0);
    11                 @Override
    12                 public Thread newThread(Runnable r) {
    13                     return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
    14                 }
    15             });
    16 
    17         Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
    18             .option(ChannelOption.TCP_NODELAY, true)
    19             .option(ChannelOption.SO_KEEPALIVE, false)
    20             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    21             .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
    22             .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
    23             .handler(new ChannelInitializer<SocketChannel>() {
    24                 @Override
    25                 public void initChannel(SocketChannel ch) throws Exception {
    26                     ChannelPipeline pipeline = ch.pipeline();
    27                     if (nettyClientConfig.isUseTLS()) {
    28                         if (null != sslContext) {
    29                             pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
    30                             log.info("Prepend SSL handler");
    31                         } else {
    32                             log.warn("Connections are insecure as SSLContext is null!");
    33                         }
    34                     }
    35                     pipeline.addLast(
    36                         defaultEventExecutorGroup,
    37                         new NettyEncoder(),
    38                         new NettyDecoder(),
    39                         new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
    40                         new NettyConnectManageHandler(),
    41                         new NettyClientHandler());
    42                 }
    43             });
    44 
    45         this.timer.scheduleAtFixedRate(new TimerTask() {
    46             @Override
    47             public void run() {
    48                 try {
    49                     NettyRemotingClient.this.scanResponseTable();
    50                 } catch (Throwable e) {
    51                     log.error("scanResponseTable exception", e);
    52                 }
    53             }
    54         }, 1000 * 3, 1000);
    55 
    56         if (this.channelEventListener != null) {
    57             this.nettyEventExecutor.start();
    58         }
    59     }

    8、MQClientInstance.startScheduledTask()方法,省略定时代码

     1     private void startScheduledTask() {
     2         //如果当前客户端没有指定setNamesrvAddr,启动查找NamesrvAddr地址服务,每两分钟一次
     3         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
     4 
     5         //从NameServerh获取topic信息后,更新客户端topic路由信息
     6         MQClientInstance.this.updateTopicRouteInfoFromNameServer();
     7 
     8         //定时清理已经不存在的broker服务
     9         MQClientInstance.this.cleanOfflineBroker();
           //定时发送心跳服务
    10 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 11 12 //定时做consumer offset持久化  13 MQClientInstance.this.persistAllConsumerOffset(); 14 15 //定时调整消费线程池 16 MQClientInstance.this.adjustThreadPool(); 17 }

    9、PullMessageService、RebalanceService

    这两个类都是Consumer端用的,这里先不分析了。有的人可能会有疑问,怎么讲Producer启动,里面还有Consumer的调用。很简单,因为Producer和Consumer相对于Broker都是客户端,两者都会调用MQClientInstance。

    一个西瓜圆又圆......打完收工!



  • 相关阅读:
    加解密相关
    Office常见问题及解决方法
    文件校验和(checksum或Hash)计算工具
    linux install mpi4py
    [zz] Install VSFTP
    Bring up a website by wordpress
    How to Get Rid of /wordpress/ From your WordPress Site URL
    Change http port in bitnami stack
    mongodb gdal 矢量数据格式驱动
    配置PostgreSQL Streaming Replication集群
  • 原文地址:https://www.cnblogs.com/shileibrave/p/9884407.html
Copyright © 2011-2022 走看看