zoukankan      html  css  js  c++  java
  • rocketmq linux 安装教程 以及本地连接远程

    rocketmq 官网网址:http://rocketmq.apache.org/docs/quick-start/

    准备

      linux 服务器

      操作系统 CentOS

      

    1.下载zip 到linux系统上(下载二进制包,不要下载资源包)

      随便下载一个镜像仓库下载:rocketmq-all-4.7.1-bin-release.zip

    2.开始安装

      2.1 rocketmq 是基于JVM运行的,所以要有java 环境  java -version 查看,没有则需要安装

       

       2.2 用unzip rocketmq-all-4.7.1-bin-release.zip 解压压缩包

      2.3  重命名  rename rocketmq-all-4.7.1-bin-release/  rocketmq rocketmq-all-4.7.1-bin-release/

    3.启动

      3.1 修改日志位置:rocketmq 默认的日志位置再${user.home}  linux 对应的位置在 /root/home 文件下,修改日志位置到rocketmq 下

        cd /data/middleware/rocketmq/conf  打开 logback_namesrv.xml 

        将${user.home} 修改为 /data/middleware/rocketmq

        在/data/middleware/rocketmq 创建logs 文件夹

      

     

     3.1 找到bin目录下 /data/middleware/rocketmq/bin 

        sh mqnamesrv 启动server 

      启用为后台运行,并输入运行日志到namesrv.log 中

      nohup sh mqnamesrv  > /data/middleware/rocketmq/logs/rocketmqlogs/namesrv.log 2>&1 &

     3.2 修改broker 的日志文件地址和启动broker

      

       启动报错,因为mq需要的内存空间不足,需要重新分配内存空间

      查看mqbroker 脚本发现最终执行的是runbroker.sh .在其中看到JAVA_OPT 的配置,修改默认配置

      

     红框中配置的堆栈空间已经大于服务器剩余内存2G,所以设置为1G

     再次启动

      nohup sh  mqbroker -n localhost:9876 > /data/middleware/rocketmq/logs/rocketmqlogs/broker.log 2>&1 &

     4.测试是否可运行

      4.1 暴露服务地址  export NAMESRV_ADDR=localhost:9876

      4.2 启动消费者:

      

      4.3 启动生产者:打开一个新控制台,生产者开始投递消息

      

       4.4 消费者开始消费

      

     5.本地连接远程  xxx.xxx.xxx.xxx 表示公网ip

      5.1 本地也需要安装rocketmq ,修改rocketmq 下conf/broker.xml 增加如下配置

        brokerIP1=xxx.xxx.xxx.xxx 

      5.2 服务器安全组开放9876 端口

      5.3 windows 本地启动:

         start .mqnamesrv

        start .mqbroker -n xxx.xxx.xxx.xxx:9876

      5.4 测试代码

        消费者

      

    package com.example.demo.rocketmq;
    import java.util.List;
    
    /**
    *
    * @description: 消费者
    *
    * @author: coderxiao
    *
    * @create: 2020-09-23 20:13
    **/
    public class RocketConsumer {
    
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xy");
            consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
            try {
                consumer.subscribe("TopicTest","*");
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("业务数据"+new String(msgs.get(0).getBody()));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            try {
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    
    }

      生产者

      

    public class RocketProducer {
    
        @Test
        public  void test1() {
            DefaultMQProducer producer=new DefaultMQProducer("xy");
            producer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
            producer.setCreateTopicKey("TopicTest");
            try {
                producer.start();
                for (int i=0;i<10;i++){
                    try {
                        Message message=new Message("TopicTest","tagA", ("who are you"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                        SendResult sendResult=producer.send(message);
                        System.out.println("发送结果:"+sendResult);
                    } catch (RemotingException e) {
                        e.printStackTrace();
                    } catch (MQBrokerException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }catch (Exception e){e.printStackTrace();}
                }
    
            } catch (MQClientException e) {
                e.printStackTrace();
            }finally {
                producer.shutdown();
            }
        }
    }

    解决:loseChannel: close the connection to remote address[] result: true 这个问题

    本地连接远程消费

      

        

      

      

  • 相关阅读:
    笔记:国际化
    【推荐系统】知乎live入门3.召回
    【推荐系统】知乎live入门2.细节补充
    【推荐系统】知乎live入门1.推荐概览与框架
    【推荐系统】知乎live入门
    【学习总结】SQL学习总结-总
    【问题解决方案】git clone失败的分析和解决
    【问题解决方案】GitHub上克隆项目到本地
    【JAVA】eclipse里代码整个前移或者后移的快捷键
    【JAVA】Java 异常中e的getMessage()和toString()方法的异同
  • 原文地址:https://www.cnblogs.com/blogxiao/p/13873275.html
Copyright © 2011-2022 走看看