zoukankan      html  css  js  c++  java
  • centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)

    1.maven 安装
    wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
    yum -y install apache-maven
    2.install
    yum -y install ant
    
    3.git 安装
    yum install git
    #查看版本
    git --version
    #显示 git version +版本号 表示成功
    #配置 git 名称以及邮箱
    git config --global user.name "Your Name"
    git config --global user.email "user@youremail" 
    3.rocketmq安装(注意这里版本是4.2.0)

    cd /usr/local/rocketmq(没有则创建目录)
    git clone -b develop https://github.com/apache/incubator-rocketmq.git

    cd incubator-rocketmq
    mvn -Prelease-all -DskipTests clean install -U

    ------------------

    一段长时间的maven 依赖下载


    -------------------

    cd distribution/target/apache-rocketmq

    2)配置文件
    vim /etc/profile 添加

    #apache rocket-mq
    export ROCKETMQ_HOME=/usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq
    export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
    export NAMESRV_ADDR=自己服务器ip:9876

       使profile 生效

       source /etc/profile

      进入到 /usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq/bin 目录下:添加权限

      chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv

      启动:

    nohup  mqnamesrv & 

    //查看启动日志 默认在bin 目录的nohup.log下
    tail -f nohup.out显示如下信息 表示启动成功

    3)由于自己的服务器使用的是阿里云的 2g 内存,启动那个 mqnamesrv后,启动mqbroker时候需要设置下内存大小,否则会报错

    vim runserver.sh(因为mqbroker脚本里面调用了runserver.sh
    sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@) 修改下图:


    之后 启动 mqbroker并将启动日志写入到指定位置.进入到target/bin目录

    nohup mqbroker & >/var/log/mq.log

      

       启动成功后,使用ps aux|grep rocketmq如下图

    4)写测试用例

      1.pom.xml引入rocket包(引入的是4.1的包,4.2的引入后无法使用)

    <!--4.2无法使用 -->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-all</artifactId>
    <version>4.2.0</version>
    <type>pom</type>
    </dependency>
    <!--4.1的引用包-->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.1.0-incubating</version>
    </dependency>


    生产者main方法:
    package cn.rocketmq;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    
    /**
     * Create by fan on 2018/4/16
     */
    public class TestProductRocketMq {
        public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    
    
            final DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producerGroupName");
           // defaultMQProducer.setVipChannelEnabled(false);
            defaultMQProducer.setNamesrvAddr("47.98.111.19:9876");
    
            try {
                defaultMQProducer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
    
            Message message = new Message("testTopic","tagA","keyA","Hello RocketMq".getBytes());
            for (int i = 0 ;i<100;i++){
                if (i%2 == 0){
                    SendResult sendResult = defaultMQProducer.send(message);
                    Thread.sleep(100);
                    System.out.println("tags send result:" + sendResult);
                }else {
                    message = new Message("testTopic","tagB","keyB","Hello RocketMq.I'm your user".getBytes());
                    SendResult sendResult = defaultMQProducer.send(message);
                    Thread.sleep(100);
                    System.out.println("tags send result:" + sendResult);
                }
            }
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> defaultMQProducer.shutdown()));
            System.exit(0);
        }
    }





    消费者方法:

    package cn.rocketmq;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * Create by fan on 2018/4/16
     */
    public class TestConsumeRocketMq {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("testProducerGroupName");
            defaultMQPushConsumer.setNamesrvAddr("47.98.111.19:9876");
    
    
            defaultMQPushConsumer.subscribe("testTopic","tagA || tagB");
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(Thread.currentThread().getName() + "Receive new message:" + list);
                    MessageExt messageExt = list.get(0);
                    System.out.println("messageExt:" + messageExt);
                    if(messageExt!=null && "testTopic".equals(messageExt.getTopic())){
    
                        if("tagA".equals(messageExt.getTags())){
    
                            String mess = new String(messageExt.getBody());
                            System.out.println("mess tagA consume:" + mess);
                        }else if("tagB".equals(messageExt.getTags())){
                            String mess = new String(messageExt.getBody());
                            System.out.println("mess tagB consume:" + mess);
                        }
                    }
                    //回执确认消息
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            defaultMQPushConsumer.start();
    
            System.out.println("Consume start.");
    
        }
    }

    之后运行main方法报错:

    RocketMq Exception "connect to <:10909> failed"

    解决办法是:centos关闭了10909以及9876防火墙端口

     firewall-cmd --zone=public --add-port=10909/tcp --permanent 

     firewall-cmd --zone=public --add-port=9876/tcp --permanent 

     
    之后,又碰到下面的问题:
     

    google了半天,尚未解决。。。 

     

      

  • 相关阅读:
    ECMAScript5之Object学习笔记(二)
    ECMAScript5之Object学习笔记(一)
    【笔记】css 自定义select 元素的箭头样式
    【笔记】h5 页面唤起电话呼叫
    【笔记】vue-cli 打包后路径问题出错的解决方法
    【笔记】BFC 模型知识整理
    【笔记】浏览器的缓存
    【笔记】web 的回流与重绘及优化
    【js 笔记】读阮一峰老师 es6 入门笔记 —— 第二章
    【js 笔记】读阮一峰老师 es6 入门笔记 —— 第一章
  • 原文地址:https://www.cnblogs.com/thinkingandworkinghard/p/8857501.html
Copyright © 2011-2022 走看看