zoukankan      html  css  js  c++  java
  • Docker部署RocketMQ(JDK11)

     

    说起微服务,不谈容器,不谈云,那还谈个啥?容器中又以Docker最为流行,那么我们今天就来实践下容器化微服务,然后顺带解决下各种疑难杂症。

    环境: Idea2019.03/Gradle6.0.1/JDK11.0.4/RocketMQ4.6.0/Linux8.0/Docker19.03.5

    难度: 新手--战士--老兵--大师

    目标:

    1. 理解RocketMQ的原理
    2. Linux8.0+JDK11+RocketMQ部署
    3. Docker部署RocketMQ集群
    4. MQ-Api应用测试

    说明:

    前面有涉及RocketMQ,本篇作为补充。为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。代码中大量使用注释,理解更轻松。代码地址:其中的day26,https://github.com/xiexiaobiao/dubbo-project.git

     

    原创文章,谢绝一切形式转载,否则追究法律责任。

     本文只发表在"公众号"和"博客园",其他均属复制粘贴!如果觉得排版不清晰,请查看公众号文章。 

    正文:

    01-RocketMQ原理

    生产使用部署图:

    • Producer/Consumer可多个成组,一般一个生产者组/消费者组对应一个微服务
    • Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer,主从间进行Async/Sync同步,Master节点之间不做数据交互,Master宕机,则该组只读不可写,slave不会自动转为master
    • Name Server是一个几乎无状态节点,节点之间无任何信息同步
    • 每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server
    • Producer/Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息

    存储模式:

    每个Topic在Broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际是指向commitLog的消息索引,消息实际保存在commitLog上。这也是Rocket支持消息重发、回溯的关键。

     

    02-Linux下单机安装

    Linux和JDK11,略!我使用rocketmq-all-4.6.0-bin-release.zip,解压:

    unzip rocketmq-all-4.6.0-source-release.zip

    解压后的目录为:/usr/rocketmq/rocketmq-all-4.6.0-bin-release

    2.1-启动nameServer: sh bin/mqnamesrv

    通过查看其shell脚本可以发现,实质是去调用runserver.sh,因rocketmq4.6启动脚本和JDK11不兼容,典型报错如下:

    因此去修改runserver.sh

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runserver.sh

    GC的机制,JDK11使用G1不使用CMS,删除JVM参数:
    
    -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection  -XX:-UseParNewGC -XX:+PrintGCDetails
    
    删除GCLog相关整行:
    
    JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    
    JDK11不再独立分jre,删除整行:
    
    JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
    
    修改:`-Xloggc` 为 `-Xlog:gc`
    
    修改类路径,否则提示找不到mainClass,增加一行:
    
    export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
    
    删除整行:
    export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

    再启动NameServer:

    2.2-启动broker实例,并用-n指定namesrv:

    sh bin/mqbroker -n localhost:9876

    通过查看其shell脚本可以发现实质是去调用runbroker.sh,有JDK11兼容问题,因此去修改runbroker.sh

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/runbroker.sh

    删除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
    增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
    
    删除:JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    
    删除:-XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+PrintGCDetails
    
    删除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"

    再启动Broker:

    2.3-还要修改tools.sh

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim ./bin/tools.sh

    删除:JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
    删除:#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
    增加:export CLASSPATH=${BASE_DIR}/lib/rocketmq-namesrv-4.6.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

    2.4-内存要足够!如遇报错信息:

    Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000700000000, 4294967296, 0) failed; error='Not enough space' (errno=12)

    下图显示mmap默认需要4G内存:

    加大内存!或者通过上面的两个sh文件中参数进行修改: JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" 改为: -Xms2g -Xmx2g -Xmn1g

    2.5-安装RocketMq客户端 https://github.com/apache/rocketmq-externals,其中的子项目:rocketmq-console,注意修改下pom依赖和properties文件,再打包成jar运行即可(具体见我往期文章)。 这个console比较简陋,凑合下:

     

    03-Docker部署RocketMQ

    因Docker容器技术还是必须要掌握的,我也拿来用一下,先使用三个container来分别提供nameSrv,broker和console服务,部署图如下:

    安装Docker到Linux,略!Docker安装rocketMQ:

    [root@server224 docker]# docker search rocketmq

    这里选择foxiswho/rocketmq,因为rocketmqinc/rocketmq里面没有nameServer。 查看镜像详细信息命令:

    [root@server224 docker]# docker inspect foxiswho/rocketmq

    查看当前镜像所有的版本shell命令:

    curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags | tr -d '[[]" ]' | tr '}' ' ' | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s ",image,$3)}}'

    下载镜像:

    [root@server224 docker]# docker pull foxiswho/rocketmq:server-4.5.2

    [root@server224 docker]# docker pull foxiswho/rocketmq:broker-4.5.2

    先启动nameserver容器:

    [root@server224 docker]# docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/rocketmq:server-4.5.2

    然后启动broker容器:

    [root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" foxiswho/rocketmq:broker-4.5.2

    再启动console容器:

    [root@server224 docker]# docker pull styletang/rocketmq-console-ng:latest

    [root@server224 docker]# docker run -d --name rmqconsole -p 8180:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng

    进入某个容器示例:

    [root@server224 docker]# docker exec -it rmqnamesrv /bin/bash

    [rocketmq@4c2ae3332c7c bin]$ ls

    可见该image下版本为openJDK1.8!

    代码测试:

    我这里只是用了一个简单的Producer/Consumer,之前的文章中已有使用RocketMq实现分布式事务的项目,这里就不重复了。

    写个生产者,com.biao.rmq.Producer

    public class Producer {
        public static void main(String[] args) throws Exception{
            // RPC hook to execute per each remoting command execution.
            // 添加钩子函数
            RPCHook rpcHook = new RPCHook() {
                @Override
                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                    System.out.println("Producer RPC hook doBeforeRequest");
                }
    
                @Override
                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                    System.out.println("Producer RPC hook doAfterResponse");
                }
            };
            //DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
            //        boolean enableMsgTrace, final String customizedTraceTopic)
            // Namespace for this MQ Producer instance.
            // DefaultMQProducer mqProducer = new DefaultMQProducer("namespace01","producerGroup01",
            //        rpcHook,true,"customizedTraceTopic01");
    
            // 以下为简单类型MQProducer
            DefaultMQProducer mqProducer = new DefaultMQProducer("producerGroup01");
            // producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
            mqProducer.setNamesrvAddr("192.168.1.224:9876");
            mqProducer.setInstanceName("rocket-Producer-instance01");
            mqProducer.start();
    
            for (int i = 0; i < 3; i++) {
                LocalDateTime localDateTime = LocalDateTime.now();
                Message msg = new Message("topic01","tag01",
                        ("message test. time:"+localDateTime).getBytes(StandardCharsets.UTF_8));
                // 获取消息发送的状态信息,包含了sendStatus/msgId/offsetMsgId/messageQueue/queueOffset,
                // 从而可以进一步根据SendResult内容做业务处理
                SendResult sendResult = mqProducer.send(msg,500L);
                System.out.println(sendResult);
            }
    
            mqProducer.shutdown();
        }
    }

    运行结果:

    写个消费者,com.biao.rmq.Consumer

    public class Consumer {
        public static void main(String[] args) throws Exception{
    
    /*        MQAdmin mqAdmin = new ;
            MQAdminImpl mqAdmin1 = new MQAdminImpl();*/
    
            // RPC hook to execute per each remoting command execution.
            // 添加钩子函数
    
            RPCHook rpcHook = new RPCHook() {
                @Override
                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                    System.out.println("Consumer RPC hook doBeforeRequest");
                }
    
                @Override
                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                    System.out.println("Consumer RPC hook doAfterResponse");
                }
            };
    
            // Strategy Algorithm for message allocating between consumers
            AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueStrategy() {
                @Override
                public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                    return null;
                }
    
                @Override
                public String getName() {
                    return null;
                }
            };
    
            // 2022年将移除DefaultMQPullConsumer,已@Deprecated,推荐未来使用 DefaultLitePullConsumer,但还没实现,这是耍人??
            /*DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("namespace02",
                    "consumerGroup02",rpcHook,allocateMessageQueueStrategy,
                    true,"customizedTraceTopic01");*/
            // simple constructor
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup02");
            consumer.setNamesrvAddr("192.168.1.224:9876");
            // Specify where to start in case the specified consumer group is a brand new one.
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // tag 的匹配模式 only support or operation such as "tag1 || tag2 || tag3"
            consumer.subscribe("topic01", "*");
            // A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    // 对消息的处理业务
                    System.out.printf("%s Receive New Messages:%n", Thread.currentThread().getName());
                    for (MessageExt msg : msgs
                         ) {
                        System.out.println("host >> " + msg.getBornHost());
                        System.out.println("MsgId >> " + msg.getMsgId());
                        System.out.println("body >> "  + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.setInstanceName("rocket-Consumer-instance01");
            // Launch the consumer instance.
            consumer.start();
            System.out.printf("Consumer Started.%n");
            //defaultMQPushConsumer.shutdown();
        }
    }

    运行结果:

    04-Docker部署HA集群

    HA集群部署图,全容器实现:

    但这里有个比较麻烦的地方,NameSrv要指定不同端口需要 –c 参数,但此image里并没有使用此参数,需重建image,这就扯的有点远了,故我将一个NameSrv装linux上,一个NameSrv装container上,权当替代方案了,但不影响效果。 linux上启动的NameSrv,可修改端口为9877,先建一个文件,只修改端口号: /usr/docker/docker/namesrv.properties

    启动时:

    [root@server224 rocketmq-all-4.6.0-bin-release]# ./bin/mqnamesrv -c /usr/docker/docker/namesrv.properties

    各容器启动配置脚本,rocket提供了模板文件:

    修改下启动配置文件,即clusterName,IP和port,这里以asyn中的broker-a.properties/broker-a-s.properties为例,其他不需要改:

    启动容器命令示例,其他类似:

     

    [root@server224 docker]#docker run -d -p 11911:11911 -p 11909:11909 --name rmqbroker3 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/a-s.properties:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2

     

    window上访问地址:http://192.168.1.224:8180/ 

    问题:

    1.常规部署时无法第二次启动broker,查看broker.log也无异常,这个问题,花了好长时间找答案!终于在store.log中发现有异常信息:

    Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make public void jdk.internal.ref.Cleaner.clean() accessible: module java.base does not "exports jdk.internal.ref" to unnamed module @3590fc5b

    然后查阅得知是JDK11模块化引起的问题,需修改jvm启动参数:

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim bin/runbroker.sh

    增加: JAVA_OPT="${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED"

    然后启动broker,却又发现broker使用了172.17.0.1这个IP地址,而nameSrv使用的是虚机另一ip 192.168.1.224,导致应用无法获取topic,即broker与nameSrv通讯失败,原因是安装docker后,docker自动安装了一个虚拟网卡,用于管理所有容器的:

    修改broker配置,指定broker的ip:

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim conf/broker.conf

    并在启动时使用配置文件:

    [root@server224 rocketmq-all-4.6.0-bin-release]# sh ./bin/mqbroker -n localhost:9876 -c ./conf/broker.conf

    再次启动nameSrv --> broker --> app终于成功!

     

    2.Docker中broker无法启动,状态一直是Exited

    [root@server224 docker]# docker logs rmqbroker

    虚机本地建立broker.conf文件,

    [root@server224 rocketmq-all-4.6.0-bin-release]# vim /usr/docker/docker/broker.conf

    再启动broker,命令中加入 -v 参数,做一个本地文件到容器的映射:

    [root@server224 docker]#docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -v /usr/docker/docker/broker.conf:/etc/rocketmq/broker.conf foxiswho/rocketmq:broker-4.5.2

    终于正常:

     

    3.容器模式下应用发送消息报错:

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

    原因是上面的映射文件broker.conf中brokerIP地址未设置,会导致brokerIP使用container的ip如172.17.0.3。

    此篇完!

     

    原创文章,谢绝任何形式转载,否则追究法律责任。

    我的微信公众号,只发原创文章。

    往期文章:

    1. 流式计算(五)-Flink 计算模型
    2. 流式计算(四)-Flink Stream API 篇二
    3. 流式计算(三)-Flink Stream 篇一
    4. 流式计算(一)-Java8Stream

  • 相关阅读:
    896. Monotonic Array单调数组
    865. Smallest Subtree with all the Deepest Nodes 有最深节点的最小子树
    489. Robot Room Cleaner扫地机器人
    JavaFX
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
  • 原文地址:https://www.cnblogs.com/xxbiao/p/12119733.html
Copyright © 2011-2022 走看看