zoukankan      html  css  js  c++  java
  • RocketMQ入门、安装、详解、*配置

    version 2.0 【更新于 2020.03.20】

    • 本次更新重点,主从同步!!!(很多小伙伴出现主从不同步的问题)
    • 主从同步关注点(详见配置说明):
      • 1. brokerRole
      • 2. brokerIP2
      • 3. 重启程序顺序
    • 官方压缩包由tar.gz改为zip
    • bin目录下多了一个文件夹,从而grep命令参数多了--exclude-dir
    • 注意 nameserver 和 master 以及 slave 的区别
    • 注意重启方式,不要使用kill -9
    • nameserver集群、broker(单master/单slave;多master/多slave;单master/多slave)

    下载&安装

    下载地址http://rocketmq.apache.org/docs/quick-start/

    解压 tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz

    解压 unzip rocketmq-all-4.7.0-bin-release.zip【version 2.0】 

    进入HOME目录 cd rocketmq-all-4.4.0-bin-release

    开启端口 9876/9876 10909/10912 验证 netstat -nltp

    修改配置

    启动脚本内存配置

    cd bin

    grep "Xmx" *

    grep 'Xmx' * --exclude-dir="*" 【version 2.0】

    grep 'MaxDirectMemorySize' * --exclude-dir="*" 【version 2.0】

    vim runbroker.sh runserver.sh vim tools.sh

     JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"

    改为 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"

    Broker配置 【master】

    vim conf/broker.conf

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    # 0 表示master,大于0 表示slave
    brokerId = 0
    # 服务节点/注册中心
    namesrvAddr=注册中心IP:9876
    # 多服务节点/注册中心,使用分号【;】
    #namesrvAddr=注册中心IP:9876;注册中心IP:9876;注册中心IP:9876
    # Broker服务地址
    brokerIP1=当前机器IP
    # BrokerHAIP地址,供slave同步消息的地址
    brokerIP2=当前机器IP
    # 删除时间【时】此处表示凌晨4点
    deleteWhen = 04
    # 数据存储时间【时】 此处表示48小时
    fileReservedTime = 48
    # SYNC_MASTER(同步双写) 、ASYNC_MASTER(异步复制) 、SLAVE
    brokerRole = SYNC_MASTER
    # SYNC_FLUSH(同步刷盘) 和ASYNC_FLUSH(异步刷盘),写磁盘
    flushDiskType = SYNC_FLUSH
    # 是否自动创建topic 线上改为false 测试true
    autoCreateTopicEnable=true

    Broker配置 【slave】

    vim conf/broker.conf

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    # 0 表示master,大于0 表示slave
    brokerId = 1
    # 服务节点/注册中心
    namesrvAddr=注册中心IP:9876
    # 多服务节点/注册中心,使用分号【;】
    #namesrvAddr=注册中心IP:9876;注册中心IP:9876;注册中心IP:9876
    # Broker服务地址
    brokerIP1=当前机器IP
    # BrokerHAIP地址,供slave同步消息的地址
    brokerIP2=当前机器IP
    # 删除时间【时】此处表示凌晨4点
    deleteWhen = 04
    # 数据存储时间【时】 此处表示48小时
    fileReservedTime = 48
    # SYNC_MASTER(同步双写) 、ASYNC_MASTER(异步复制) 、SLAVE
    brokerRole = SLAVE
    # SYNC_FLUSH(同步刷盘) 和ASYNC_FLUSH(异步刷盘),写磁盘
    flushDiskType = SYNC_FLUSH
    # 是否自动创建topic 线上改为false 测试true
    autoCreateTopicEnable=true

    启动

    1. 【NamesrvStartup】启动namesrv nohup bin/mqnamesrv -n 注册中心IP:9876 > mqnamesrv.log 2>&1 &

    2. 检查端口监听是否为0.0.0.0:9876/注册中心IP:9876 命令 netstat -anpt | grep 9876

    3. 【BrokerStartup】启动master节点 nohup sh bin/mqbroker -n 注册中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

    4. 【BrokerStartup】启动slave节点 nohup sh bin/mqbroker -n 注册中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

    5. 查看是否注册成功(集群信息) bin/mqadmin clusterList -n 注册中心IP:9876

    停止 不要使用kill -9!!!

    1. 停止 slave bin/mqshutdown broker

    2. 停止 master bin/mqshutdown broker

    3. 停止 namesrv bin/mqshutdown namesrv

    常用命令

    • cd rocketmq-all-4.4.0-bin-release/
    • 注册中心机器上

    ---集群相关

    查询集群信息 bin/mqadmin clusterList -n localhost:9876
    
    打印Broker配置 bin/mqbroker -m -n localhost:9876
    
    更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876
    
    查看Broker统计信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10909

    ---订阅组相关

    创建订阅组 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName
    
    列出消费组 bin/mqadmin consumerProgress -n localhost:9876
    
    查看消费组IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876
    
    查看消费组数据堆积 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName
    
    删除订阅组 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName

    ---Topic相关

    创建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName
    
    Topic列表 bin/mqadmin topicList -n localhost:9876
    
    发送Topic消息测试 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024
    
    打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName
    
    Topic详情统计 bin/mqadmin topicstatus -n localhost:9876 -t TopicName
    
    获取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName
    
    删除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName
    
    查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName
    
    查看Topic状态 bin/mqadmin topicStatus -n localhost:9876 -t TopicName
    
    根据ID查询消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876
    
    根据偏移量查询消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName

    broker配置说明

    基础配置

    配置描述默认值例子
    namesrvAddr nameServer地址,如果nameserver是多台集群的话,用分号分割 namesrvAddr=10.1.219.75:9876;10.1.219.76:9876
    brokerClusterName 所属集群名字。Cluster 的地址,如果集群机器数比较多,可以分成多个Cluster ,每个Cluster 供一个业务群使用 brokerClusterName=rocketmq-cluster
    brokerName Broker 的名称, Master 和Slave 通过使用相同的Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的Slave   brokerName=broker-a
    brokerId 一个Master Barker 可以有多个Slave, 0 表示Master ,大于0 表示不同的 Slave 的ID   brokerId=0
    fileReservedTime 在磁盘上保存消息的时长,单位是小时,自动删除超时的消息   fileReservedTime=48
    deleteWhen 与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点   deleteWhen=04
    brokerRole brokerRole 有3 种: SYNCMASTER(同步双写) 、ASYNCMASTER(异步复制) 、SLAVE 。关键词SYNC 和ASYNC 表示Master 和Slave 之间同步消息的机制, SYNC 的意思是当Slave 和Master 消息同步完成后,再返回发送成功的状态   brokerRole=SYNC_MASTER
    flushDiskType flushDiskType 表示刷盘策略,分为SYNCFLUSH 和ASYNCFLUSH两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写人磁盘后再返回成功状态;异步刷盘情况下,消息写人page_cache 后就返回成功状态   flushDiskType=ASYNC_FLUSH
    listenPort Broker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突   listenPort=10911
    storePathRootDir 存储消息以及一些配置信息的根目录   storePathRootDir=/app/custom/data/rocketmq/store-a

    进阶配置

    配置描述默认值例子
    autoCreateTopicEnable 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭   autoCreateTopicEnable=true
    defaultTopicQueueNums 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。   defaultTopicQueueNums=4
    autoCreateSubscriptionGroup 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭   autoCreateTopicEnable=true
    mapedFileSizeCommitLog commitLog每个文件的大小,默认1G 1G mapedFileSizeCommitLog=1073741824
    mapedFileSizeConsumeQueue ConsumeQueue每个文件默认存30W条,根据业务情况调整   mapedFileSizeConsumeQueue=300000

    存储配置

    配置描述默认值例子
    storePathRootDir 存储消息以及一些配置信息的根目录   storePathRootDir=/app/custom/data/rocketmq/store-a
    storePathCommitLog commitLog 存储路径   storePathCommitLog=/data/rocketmq/store/commitlog
    storePathConsumeQueue 消费队列存储路径存储路径   storePathConsumeQueue=/data/rocketmq/store/consumequeue
    storePathIndex 消息索引存储路径   storePathIndex=/data/rocketmq/store/index
    storeCheckpoint checkpoint 文件存储路径   storeCheckpoint=/data/rocketmq/store/checkpoint
    abortFile abort 文件存储路径   abortFile=/data/rocketmq/store/abort

    JAVA示例

      1. pom.xml

    <dependencies>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.5.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>4.5.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-tcnative</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
        </dependencies>

      2. Producer 生产者

    public class Producer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new
                    DefaultMQProducer("GroupName");
            // Specify name server addresses.
            producer.setNamesrvAddr("IP:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicName" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }

      3. Consumer 消费者

    public class Consumer {
        public static void main(String[] args) {
            String topicName = "TopicName";
            DefaultMQPushConsumer consumer =
                    new DefaultMQPushConsumer("GroupName");
            consumer.setNamesrvAddr("IP:9876");
            try {
                consumer.subscribe(topicName, "*");
                /**
                 * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
                 * 如果非第一次启动,那么按照上次消费的位置继续消费
                 */
                consumer.setConsumeFromWhere(
                        ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                /**
                 * 如果是顺序消息,这边的监听就要使用MessageListenerOrderly监听
                 * 并且,返回结果也要使用ConsumeOrderlyStatus
                 */
                consumer.registerMessageListener(new MessageListenerOrderly() {
                    @Override
                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                        //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                        context.setAutoCommit(true);
                        try {
                            for (MessageExt msg : msgs) {
                                String recString = null;
                                try {
                                    recString = new String(msg.getBody(), "UTF-8");
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(recString);
                        } catch (Exception e) {
                            e.printStackTrace();
                            //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                        //消费成功
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

      4. 打包成jar的插件[可选]

    <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <!--jdk 版本-->
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <!--全限定名-->
                                        <mainClass>com.package.Consumer</mainClass>
                                    </transformer>
                                </transformers>
                                <artifactSet> </artifactSet>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
     个人微信,有什么建议、意见或补充,欢迎及时沟通!!!(添加时注明“博客园”,谢谢)
  • 相关阅读:
    VC(VISUAL_C++)虚拟键VK值列表
    关于新一轮QQ Tencent://Message 在线联系
    (记录) sql exists 应用及 order by注意点
    (记录)IE8 ..样式错乱解决
    jquery 关于ajax 中文字符长度过长后不执行
    DataList 嵌套绑定CheckBoxList [记录, 以免忘记哈.]
    (记录)MSSQL 的一些应用 查询数据统计适用 添加月份日号作为行记录
    数据结构回顾算法
    Modeling Our World笔记
    数据结构2数组
  • 原文地址:https://www.cnblogs.com/pidgey/p/11733719.html
Copyright © 2011-2022 走看看