zoukankan      html  css  js  c++  java
  • 本地启动 NameServer 和 Broker | 读 RocketMQ 源码前的准备工作

    1. clone 并导入源码
    2. 本地启动 NameServer
    3. 本地启动 Broker
    4. 本地运行生产者与消费者代码

    完成上述步骤之后,RocketMQ的源码环境就搭建完毕了,之后就可以在本地启动以及收发消息,调试和分析RocketMQ的源码了。

    clone 并导入源码

    在 github 上选择对应的的代码 https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0,将其 clone 下来,再切出 4.7.0 版本的源码。Clone 到本地之后,用 IDEA 打开项目。

    clone代码

    项目结构

    目录结构

    模块 作用
    broker Broker 相关代码
    client Producer、Consumer 客户端代码,用于生产消息、消费消息
    common 公共代码
    dev 开发相关的信息
    distribution 部署相关,比如配置文件
    example 例子
    filter 过滤器
    logappender 日志相关
    logging 日志相关
    namesvr NameServer
    openmessaging 开放消息标准
    remoting 远程网络通信,基于 netty 实现
    srvutil 工具类
    store 消息如何在 Broker 中进行存储相关代码
    style 代码检查
    test 测试
    tools 命令行监控

    本地启动 NameServer

    接下来我们要做的是在本地启动 NameServer,包括两个步骤:

    1. 在 IDEA 中配置启动相关的信息,NameServer 的启动类是org.apache.rocketmq.namesrv.NamesrvStartup
    2. 准备好启动 NameServer 需要的配置文件和目录

    看上图:

    1. 配置启动类的名字 NameServerStartup
    2. 配置主类的路径 org.apache.rocketmq.namesrv.NamesrvStartup
    3. 工作目录,也就是当前代码所在的目录
    4. 运行目录 ROCKETMQ_HOME,这个目录里面放的是运行时需要的配置文件、数据、日志等。你需要创建一个目录,在里面创建 conflogsstore目录

    接着将源码中 distrbution 模块中的 logback_namesvr.xml 文件拷贝到上面的 conf 目录下,并将这个文件中的${user.home}全部替换为前面配置的运行目录。

    然后运行配置好的启动类,就会读取 conf 里的配置文件,并将日志打印在logs目录里,数据都会写在store目录里。看到 IDEA 的打印出下面这样的信息,就说明 NameServer 启动成功了。

    本地运行 Broker

    启动 Broker 和启动 NameServer 的过程类似。首先也是配置启动类:

    1. Broker 的启动类在 org.apache.rocketmq.broker.BrokerStartup
    2. 不一样的地方是要设置一个参数 -c你的broker.conf配置文件的路径,因为程序启动的时候会读-c这个参数
    3. 接着还是设置工作目录和运行目录,选择 module 为 rocketmq-broker

    接着把distrbution模块中的 broker.conflogback_broker.xml 文件拷贝到 conf目录下:

    1. 将 logback_broker.xml 的${user.home}替换为你的 RocketMQ 运行目录
    2. broker.conf 按照下面的配置方式进行配置
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    # nameserver的地址
    namesrvAddr=127.0.0.1:9876
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    # 运行目录的store目录
    storePathRootDir=/Users/shui/Desktop/rocketmq-nameserver/store
    # commitLog的存储路径
    storePathCommitLog=你的store目录/commitlog
    # consume queue文件的存储路径
    storePathConsumeQueue=你的store目录/consumequeue
    # 消息索引文件的存储路径
    storePathIndex=你的store目录/store/index
    # checkpoint文件的存储路径
    storeCheckpoint=你的store目录/checkpoint
    # abort文件的存储路径
    abortFile=你的store目录/abort
    

    最后运行主类,看到控制台打印如下信息就表示启动成功:

    此时 rocketmqlogs,里面有一个broker.log,就可以看到Broker的启动日志了:

    本地运行生产者与消费者代码

    在控制台创建一个 topic 名为 TopicTest。如果不知道如何使用 RocketMQ 的控制台,可以看我之前写这篇文章:https://www.cnblogs.com/shuiyj/p/13200658.html。

    接着去修改 example 中给出的生产者和消费者代码 org.apache.rocketmq.example.quickstart.Consumerorg.apache.rocketmq.example.quickstart.Producer

    生产者

    改动两个地方:

    1. 设置 NameServer 地址,让生产者可以获取到 Broker 地址
    2. 本来发送 1000 条信息,改少一点发送 3 条,便于消费的时候观察
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            /*
             * Instantiate with a producer group name.
             */
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
            // 其他代码不变
          	// 在这里设置 NameServer 地址,保证  Producer 可以从 NameServer 获取到 Broker 地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            /*
             * Launch the instance.
             */
            producer.start();
    	     	
          // 本来是发送 1000 条消息,改成发送 3 条
            for (int i = 0; i < 3; i++) {
                try {
    
                    /*
                     * Create a message instance, specifying topic, tag and message body.
                     */
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
    
    

    看到控制台输出如下所示的信息,表示消息发送成功了。

    SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, offsetMsgId=C0A8010800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, offsetMsgId=C0A8010800002A9F00000000000000CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, offsetMsgId=C0A8010800002A9F0000000000000194, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
    

    消费者

    消息者只改动一个地方,就是设置 NameServer 地址,也是为了获取到 Broker 的地址。

    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 省略其它代码...
    
            // 设置 NameServer 地址,保证  Consumer 可以从 NameServer 获取到 Broker 地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
            /*
             *  Launch the consumer instance.
             */
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    

    可以看到消费到了 3 条数据,并打印出了消息的相关信息。

    00:24:23.571 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    Consumer Started.
    ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869675, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869676, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F00000000000000CA, commitLogOffset=202, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064336, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
    ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869678, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869679, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000194, commitLogOffset=404, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064339, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
    ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869552, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869574, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064340, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
    
  • 相关阅读:
    关于sql json数据的处理
    时间函数strtotime的强大
    /usr/bin/install: cannot create regular file `/usr/local/jpeg6/include/jconfig.h'
    linux安装php7.2.7
    关于sql时间方面的处理
    关于centos防火墙的一些问题
    linux 安装ssl 失败原因
    linux安装php7.2.7
    拾取坐标和反查询接口api
    【转】通过点击获取地址等信息、可以传值
  • 原文地址:https://www.cnblogs.com/shuiyj/p/13215978.html
Copyright © 2011-2022 走看看