zoukankan      html  css  js  c++  java
  • RocketMQ 集群监控以及Hello World

    RocketMQ 目前有两个版本  alibaba版本和apache版本

    一、alibaba版本

    tomcat部署:

      apache-tomcat-7.0.90.tar.gz

      jdk7

      虚拟机redhat6.5:192.168.1.201

    1、上传rocketmq-console.war包到tomcat下webapps

    #新建目录
    mkdir rocketmq-console
    #解压war文件
    unzip rocketmq-console.war -d rocketmq-console
    #删除war
    rm -f rocketmq-console.war

    2、修改配置  指定nameserveraddr

    cd /usr/local/tomcat7/webapps/rocketmq-console/WEB-INF/classes
    
    vim config.properties
    
    rocketmq.namesrv.addr=192.168.1.201:9876;192.168.1.202:9876
    throwDone=true

    3、启动tomcat

    [root@201 bin]# /usr/local/tomcat7/bin/startup.sh
    Using CATALINA_BASE:   /usr/local/tomcat7
    Using CATALINA_HOME:   /usr/local/tomcat7
    Using CATALINA_TMPDIR: /usr/local/tomcat7/temp
    Using JRE_HOME:        /usr/local/java/jdk1.7.0_80
    Using CLASSPATH:       /usr/local/tomcat7/bin/bootstrap.jar:/usr/local/tomcat7/bin/tomcat-juli.jar
    Tomcat started.
    [root@201 bin]# jps
    2133 Jps
    1483 NamesrvStartup
    2094 Bootstrap
    1529 BrokerStartup

    4、浏览器访问

    http://192.168.1.201:8080/rocketmq-console

    /**
     * Copyright (C) 2010-2013 Alibaba Group Holding Limited
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package rocketmq.quickstart;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    /**
     * Producer,发送消息
     * 
     */
    public class Producer 
    {
        public static void main(String[] args) throws MQClientException,
                InterruptedException 
        {
            DefaultMQProducer producer = new DefaultMQProducer("QuickStart_Producer");
            producer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
            producer.start();
    
            for (int i = 0; i < 1000; i++) 
            {
                try 
                {
                    Message msg = new Message("TopicQuickStart",// topic
                            "TagA",// tag
                            ("Hello RocketMQ " + i).getBytes()// body
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    /**
     * Copyright (C) 2010-2013 Alibaba Group Holding Limited
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package rocketmq.quickstart;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException,
                MQClientException 
        {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStart_Consumer");
            consumer.setNamesrvAddr("192.168.1.201:9876;192.168.1.202:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicQuickStart", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (int i = 0; i < msgs.size(); i++) {
                            MessageExt msg = msgs.get(i);
                            String topic = msg.getTopic();
                            String tag = msg.getTags();
                            String body = new String(msg.getBody(), "UTF-8");
                            System.out.println(Thread.currentThread().getName()
                                    + " Receive New Messages: topic=" 
                                    + topic+",tag="
                                    + tag +",body="
                                    + body);
    
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }

    生产消息:

    消费消息:

    监控消息:

     

    二、apache版本

    贡献给apache之后,上面的那个工具就无法使用了,不过今天从网上找到了个新的管理界面。

    github地址:https://github.com/apache/incubator-rocketmq-externals

    2018年1月23日,更新最新github地址为:https://github.com/apache/rocketmq-externals

    参照帮助文件使用即可:

    帮助文档路径:https://github.com/apache/incubator-rocketmq-externals/blob/master/rocketmq-console/README.md

    具体如下:

    1、修改配置文件,使管理界面与rocketmq集群产生关联。

    incubator-rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties

    修改内容及修改结果如下图所示:

    2、编译rocketmq-console

    编译命令:mvn clean package -Dmaven.test.skip=true(注意:不要直接使用mvn package,会提示很多错误)

    3、将编译好的jar包上传到linux服务器

    (如果直接在Linux环境上编译,可以省略这步)

    我这里上传到了本地虚拟机192.168.6.5上。路径为:/home/hadmin/jar

    4、运行jar包

    命令:java -jar target/rocketmq-console-ng-1.0.0.jar

    5、使用浏览器访问管理界面

    方位地址:http://192.168.6.5:8080/

    6、可能遇到的问题

    画面可以正常启动,不过从控制台的监控日志上看,存在如下的错误日志。

    org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <192.168.1.80:10918> timeout, 5000(ms)

    原因是isVIPChannel默认为true,会监控rocketmq的vip通道,将该属性设置为false即可。

    设置后的配置文件如下所示:

    复制代码
    server.contextPath=
    server.port=8080
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=192.168.1.80:9876;192.168.1.81:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=false
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/home/hadmin/data/rocketmq
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
    复制代码

    参考:https://www.cnblogs.com/quchunhui/p/7284752.html

  • 相关阅读:
    SQL Server 使用日志传送
    SQL Server 2008 R2 主从数据库同步
    JavaScript及C# URI编码详解
    sql server日期时间函数
    ASP.NET Core在Azure Kubernetes Service中的部署和管理
    [Nuget]Nuget命令行工具安装
    利用HttpListener创建简单的HTTP服务
    短链接实现
    [ubuntu]中文用户目录路径改英文
    [ubuntu]deb软件源
  • 原文地址:https://www.cnblogs.com/cac2020/p/9447938.html
Copyright © 2011-2022 走看看