zoukankan      html  css  js  c++  java
  • RocketMQ 集群监控以及Hello World

    RocketMQ 目前有两个版本  alibaba版本和apache版本

    一、alibaba版本

    tomcat部署:

      apache-tomcat-7.0.90.tar.gz

      jdk7

      虚拟机redhat6.5:192.168.1.201

    1、上传rocketmq-console.war包到tomcat下webapps

    #新建目录
    mkdir rocketmq-console
    #解压war文件
    unzip rocketmq-console.war -d rocketmq-console
    #删除war
    rm -f rocketmq-console.war

    2、修改配置  指定nameserveraddr

    cd /usr/local/tomcat7/webapps/rocketmq-console/WEB-INF/classes
    
    vim config.properties
    
    rocketmq.namesrv.addr=192.168.1.201:9876;192.168.1.202:9876
    throwDone=true

    3、启动tomcat

    [root@201 bin]# /usr/local/tomcat7/bin/startup.sh
    Using CATALINA_BASE:   /usr/local/tomcat7
    Using CATALINA_HOME:   /usr/local/tomcat7
    Using CATALINA_TMPDIR: /usr/local/tomcat7/temp
    Using JRE_HOME:        /usr/local/java/jdk1.7.0_80
    Using CLASSPATH:       /usr/local/tomcat7/bin/bootstrap.jar:/usr/local/tomcat7/bin/tomcat-juli.jar
    Tomcat started.
    [root@201 bin]# jps
    2133 Jps
    1483 NamesrvStartup
    2094 Bootstrap
    1529 BrokerStartup

    4、浏览器访问

    http://192.168.1.201:8080/rocketmq-console

    /**
     * Copyright (C) 2010-2013 Alibaba Group Holding Limited
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package rocketmq.quickstart;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    /**
     * Producer,发送消息
     * 
     */
    public class Producer 
    {
        public static void main(String[] args) throws MQClientException,
                InterruptedException 
        {
            DefaultMQProducer producer = new DefaultMQProducer("QuickStart_Producer");
            producer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
            producer.start();
    
            for (int i = 0; i < 1000; i++) 
            {
                try 
                {
                    Message msg = new Message("TopicQuickStart",// topic
                            "TagA",// tag
                            ("Hello RocketMQ " + i).getBytes()// body
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    /**
     * Copyright (C) 2010-2013 Alibaba Group Holding Limited
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package rocketmq.quickstart;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException,
                MQClientException 
        {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStart_Consumer");
            consumer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicQuickStart", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (int i = 0; i < msgs.size(); i++) {
                            MessageExt msg = msgs.get(i);
                            String topic = msg.getTopic();
                            String tag = msg.getTags();
                            String body = new String(msg.getBody(), "UTF-8");
                            System.out.println(Thread.currentThread().getName()
                                    + " Receive New Messages: topic=" 
                                    + topic+",tag="
                                    + tag +",body="
                                    + body);
    
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }

    生产消息:

    消费消息:

    监控消息:

     

    二、apache版本

    贡献给apache之后,上面的那个工具就无法使用了,不过今天从网上找到了个新的管理界面。

    github地址:https://github.com/apache/incubator-rocketmq-externals

    2018年1月23日,更新最新github地址为:https://github.com/apache/rocketmq-externals

    参照帮助文件使用即可:

    帮助文档路径:https://github.com/apache/incubator-rocketmq-externals/blob/master/rocketmq-console/README.md

    具体如下:

    1、修改配置文件,使管理界面与rocketmq集群产生关联。

    incubator-rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties

    修改内容及修改结果如下图所示:

    2、编译rocketmq-console

    编译命令:mvn clean package -Dmaven.test.skip=true(注意:不要直接使用mvn package,会提示很多错误)

    3、将编译好的jar包上传到linux服务器

    (如果直接在Linux环境上编译,可以省略这步)

    我这里上传到了本地虚拟机192.168.6.5上。路径为:/home/hadmin/jar

    4、运行jar包

    命令:java -jar target/rocketmq-console-ng-1.0.0.jar

    5、使用浏览器访问管理界面

    方位地址:http://192.168.6.5:8080/

    6、可能遇到的问题

    画面可以正常启动,不过从控制台的监控日志上看,存在如下的错误日志。

    org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <192.168.1.80:10918> timeout, 5000(ms)

    原因是isVIPChannel默认为true,会监控rocketmq的vip通道,将该属性设置为false即可。

    设置后的配置文件如下所示:

    复制代码
    server.contextPath=
    server.port=8080
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=192.168.1.80:9876;192.168.1.81:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=false
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/home/hadmin/data/rocketmq
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
    复制代码

    参考:https://www.cnblogs.com/quchunhui/p/7284752.html

  • 相关阅读:
    第一节:SpringMVC概述
    SpringMVC【目录】
    Windows 系统快速查看文件MD5
    (error) ERR wrong number of arguments for 'hmset' command
    hive使用遇到的问题 cannot recognize input
    Overleaf支持的部分中文字体预览
    Understanding and Improving Fast Adversarial Training
    Django2实战示例 第十三章 上线
    Django2实战示例 第十二章 创建API
    Django2实战示例 第十一章 渲染和缓存课程内容
  • 原文地址:https://www.cnblogs.com/cac2020/p/9447938.html
Copyright © 2011-2022 走看看