zoukankan      html  css  js  c++  java
  • RocketMq 完整部署

    RocketMq 部署

    环境

    64bit OS, Linux/Unix/Mac is recommended;
    64bit JDK 1.8+;
    Maven 3.2.x;
    Git;
    4g+ free disk for Broker server
    

    物理机部署

    unzip rocketmq-all-4.4.0-bin-release.zip 
    cd rocketmq-all-4.4.0-bin-release/
    

    自定义日志目录

    # 在以下文件中替换默认的日志路径 ${user.home}/logs/rocketmqlogs/*
    conf/logback_broker.xml
    conf/logback_namesrv.xml
    conf/logback_tools.xml
    

    自定义参数和数据存放位置

    conf 文件夹里有三种配置

    • 2m-noslave 两主,无从的配置
    • 2m-2s-async 两主,两从,同步复制数据的配置
    • 2m-2s-sync 两主,两从,异步复制数据的配置
    ##以修改 conf/2m-noslave/broker-a.properties 为例
    
    #所属集群名字  
    brokerClusterName=DefaultCluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=127.0.0.1:9876
    
    #改成服务器的本机ip,重要!
    brokerIP1=127.0.0.1
    
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    
    #自定义数据存储路径
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    
    
    #限制的消息大小  
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    服务启动

    启动name server

      > nohup sh bin/mqnamesrv &
      > tail -f logs/rocketmqlogs/namesrv.log
      The Name Server boot success...
    

    启动broker

     > nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-a.propertie &
     > tail -f logs/rocketmqlogs/broker.log 
      The broker[%s, 172.30.30.233:10911] boot success...
    

    关停服务

    > sh bin/mqshutdown broker
    The mqbroker(36695) is running...
    Send shutdown request to mqbroker(36695) OK
    
    > sh bin/mqshutdown namesrv
    The mqnamesrv(36664) is running...
    Send shutdown request to mqnamesrv(36664) OK
    

    尝试发送消息

    > export NAMESRV_ADDR=localhost:9876
     > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
     SendResult [sendStatus=SEND_OK, msgId= ...
    
     > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
     ConsumeMessageThread_%d Receive New Messages: [MessageExt...
    

    常见报错

    发消息报错

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:588)
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1223)
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1173)
    	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)
    	at com.flying.demo.Producer.main(Producer.java:25)
    

    原因:没有配置brokerIP

    部署 rockermq console

    建议docker 部署
    非docker部署参见官方文档 https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

    docker pull styletang/rocketmq-console-ng
    docker run --name rocketmq-console -dit -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" -p 8080:8080 styletang/rocketmq-console-ng bash
    # 进入docker查看
    docker exec -it rocketmq-console /bin/bash
    

    部署完成,到对应的端口查看即可。可以查看服务监控,历史消息详情,手动创建topic等。

    docker 部署

    官方镜像 rocketmqinc/rocketmq
    部署步骤参见 http://www.justdojava.com/2019/08/26/rocketmq-creator/

    Java 示例

    # 引入依赖
    implementation 'org.apache.rocketmq:rocketmq-client:4.3.0'
    
    package mq;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            //创建一个消息生产者,并设置一个消息生产者组
            DefaultMQProducer producer = new DefaultMQProducer("producer_test");
    
            //指定 NameServer 地址
            producer.setNamesrvAddr("127.0.0.1:9876");
    
            //初始化 Producer,整个应用生命周期内只需要初始化一次
            producer.start();
    
            for (int i = 0; i < 5; i++) {
                //创建一条消息对象,指定其主题、标签和消息内容,如果服务不支持自动创建topic的话,需要先手动创建topic,也可以在rocketmq console里操作
                Message msg = new Message(
                        "topic_test" /* 消息主题名 */,
                        "TagA" /* 消息标签 */,
                        ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
                );
    
                //发送消息并返回结果
                SendResult sendResult = producer.send(msg);
    
                System.out.printf("%s%n", sendResult);
            }
    
            // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
            producer.shutdown();
        }
    
    }
    
    
    package mq;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Date;
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            //创建一个消息消费者,并设置一个消息消费者组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
            //指定 NameServer 地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
            //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅指定 Topic 下的所有消息
            consumer.subscribe("topic_test", "*");
    
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                    //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
                    if (list != null) {
                        for (MessageExt ext : list) {
                            try {
                                System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 消费者对象在使用之前必须要调用 start 初始化
            consumer.start();
            System.out.println("消息消费者已启动");
        }
    }
    
    
  • 相关阅读:
    数据库设计优化(一)--基础
    数据库设计--范式原则
    迭代器 与 foreach 的区别
    DBeaver中如何调整SQL编辑器的字体大小
    腾讯课堂下载回放视频
    超级美味的大盘鸡做法
    关闭或开启Win10系统的自动更新
    geoserver发布地图瓦片影像数据
    使用GeoServer发布Shapfile数据
    GeoServer下载与安装(Windows版)
  • 原文地址:https://www.cnblogs.com/ylty/p/11851491.html
Copyright © 2011-2022 走看看