zoukankan      html  css  js  c++  java
  • RocketMQ 消息队列单机部署及使用

     

    转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876

    相关文章:

    0 RocketMQ简单介绍

    0.1 介绍

    RocketMQ是一个消息中间件。

    消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ里相同有这两个概念。消息生产者负责创建消息并发送到RocketMQ服务器。RocketMQ服务器会将消息持久化到磁盘,消息消费者从RocketMQ服务器拉取消息并提交给应用消费。

    0.2 特点

    RocketMQ是一款分布式、队列模型的消息中间件,具有下面特点:

    • 支持严格的消息顺序
    • 支持Topic与Queue两种模式
    • 亿级消息堆积能力
    • 比較友好的分布式特性
    • 同一时候支持Push与Pull方式消费消息
    • 历经多次天猫双十一海量消息考验

    0.3 部署结构

    这里写图片描写叙述

    上图所看到的为RocketMQ的部署结构,图中Meta字样为RocketMQ早期代号。

    1 RocketMQ 消息队列单机部署

    1.1 系统配置环境

    主机:Linux 
    内存:8G 
    硬盘:250G 
    CPU:4核 
    这里写图片描写叙述 
    这里写图片描写叙述

    1.2 须要用到的软件包和文档

    眼下在Github上可下载最新的安装包alibaba-rocketmq-3.2.6.tar

    下载地址:https://github.com/alibaba/RocketMQ

    历史版本号说明文档:Metaq原理与应用.docx

    备注:RocketMQ早起在淘宝内部叫Metaq,去年改名为RocketMQ。不过该文档是针对历史版本号的Metaq,仅供參考和熟悉一些概念。

    1.3 服务器java环境

    $java -version
    java version "1.8.0_45"
    Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
    Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

    1.4 rocketmq服务端安装

    解压alibaba-rocketmq-3.2.6.tar

    tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/

    配置rocketmq的环境变量,在/etc/profile最后加入

    export ROCKETMQ_HOME=/opt/alibaba-rocketmq
    export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

    这里写图片描写叙述

    使rocketmq的环境变量生效

    source /etc/profile

    给下列命令可运行权限

    cd /opt/alibaba-rocketmq/bin/;
    chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv

    这里写图片描写叙述

    新建日志目录

    cd /opt/alibaba-rocketmq
    mkdir log

    这里写图片描写叙述

    启动nameserver

    nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &

    查看启动状态

    $ps aux|grep java
    125233   12248 21.1  0.9 7151512 75844 pts/1   Sl   11:37   0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup

    验证nameserver是否启动

    $tail -f /opt/alibaba-rocketmq/log/ng.log
    The Name Server boot success.

    启动broker,在启动borker之前须要指定nameserver地址。当中10.125.1.186为所在服务器IP

    export NAMESRV_ADDR=10.125.1.186:9876
    nohup mqbroker >/opt/alibaba-rocketmq/log/mq.log &

    验证mqbroker是否启动

    tail -f /opt/alibaba-rocketmq/log/mq.log
    
    Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
    Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
    Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
    The broker[e010125001186.bja, 10.125.1.186:10911] boot success. and name server is 10.125.1.186:9876

    最后配置防火墙 
    nameserver端口为9876 
    broker端口为10911

    lokkit -p 9876:tcp -p 10911:tcp

    关闭nameserver broker运行的命令

    mqshutdown namesrv
    mqshutdown broker

    关闭nameserver

    mqshutdown namesrv
    The mqnamesrv(12248) is running...
    Send shutdown request to mqnamesrv(12248) OK

    关闭broker

    $mqshutdown broker
    The mqbroker(13634) is running...
    Send shutdown request to mqbroker(13634) OK

    成功安装显示结果: 
    这里写图片描写叙述

    2 java客户端使用RocketMQ 消息队列

    2.1 依赖配置

    <!-- RocketMQ Java SDK -->
    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>

    2.2 创建生产者

    用来获取一个单例的生产者。

    package com.autonavi.rocketmq.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    
    /**
     * @author dddd
     * @description 消息生产者
     * @date 2016-04-07
     */
    public class Producer {
    
         /*
         * Constructs a client instance with your account for accessing DefaultMQProducer
         */
        private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        private static int initialState = 0;
    
        private Producer() {
    
        }
    
        public static DefaultMQProducer getDefaultMQProducer(){     
            if(producer == null){
                producer = new DefaultMQProducer("ProducerGroupName");          
            }
    
            if(initialState == 0){
                producer.setNamesrvAddr("100.125.1.186:9876");
                try {
                    producer.start();
                } catch (MQClientException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    return null;
                }
    
                initialState = 1;
            }
    
            return producer;        
        }
    
    }

    2.3 创建消费者

    用来获取一个单例的消费者。

    消费者相似于直接操作数据库的对象,比方生产者下了订单订火车票。消费者就一直监听。有订单消息过来了,就去运行下订单操作。

    package com.autonavi.rocketmq.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    
    /**
     * @author dddd
     * @description 消息消费者
     * @date 2016-04-07
     */
    public class Consumer {
    
         /*
         * Constructs a client instance with your account for accessing DefaultMQConsumer
         */
        private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        private static int initialState = 0;
    
        private Consumer() {
    
        }
    
        public static DefaultMQPushConsumer getDefaultMQPushConsumer(){     
            if(consumer == null){
                consumer = new DefaultMQPushConsumer("ConsumerGroupName");          
            }
    
            if(initialState == 0){
                consumer.setNamesrvAddr("100.125.1.186:9876");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                initialState = 1;
            }
    
            return consumer;        
        }
    
    }

    2.4 创建生产和消费服务

    package com.autonavi.rocketmq.service;
    
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    import com.autonavi.rocketmq.consumer.Consumer;
    import com.autonavi.rocketmq.producer.Producer;
    
    public class Test {
    
        private static final Logger logger = LoggerFactory.getLogger(Test.class);
    
        public static void main(String[] args){
    
            sendMsg();
        }
    
        public static void sendMsg(){
    
            // 获取消息生产者
            DefaultMQProducer producer = Producer.getDefaultMQProducer();
    
            try {
                for(int i=0;i<2000;i++){
                    Message msg = new Message(
                            "TopicTest1",                   // topic
                            "TagA",                         // tag
                            "OrderID00"+i,                  // key
                            ("Hello MetaQ"+i).getBytes());  // body
                    SendResult sendResult = producer.send(msg);
                    //logger.info("sendResult:{}", sendResult);
                }           
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (RemotingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (MQBrokerException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
            producer.shutdown();
        }
    
        public static void receiveMsg(){
    
            // 获取消息生产者
            DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();
    
            // 订阅主体
            try {
                consumer.subscribe("TopicTest1", "*");
    
                consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                    /**
                     * * 默认msgs里唯独一条消息,能够通过设置consumeMessageBatchMaxSize參数来批量接收消息
                     */
                    public ConsumeConcurrentlyStatus consumeMessage(
                            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                        logger.info("currentThreadName:{} and Receive New Messages:{}",Thread.currentThread().getName(),msgs);
    
                        MessageExt msg = msgs.get(0);
    
                        if (msg.getTopic().equals("TopicTest1")) {
                            // 运行TopicTest1的消费逻辑
                            if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                                // 运行TagA的消费
                                logger.info("MsgBody:{}",new String(msg.getBody()));
                            } else if (msg.getTags() != null
                                    && msg.getTags().equals("TagC")) {
                                // 运行TagC的消费
                            } else if (msg.getTags() != null
                                    && msg.getTags().equals("TagD")) {
                                // 运行TagD的消费
                            }
                        } else if (msg.getTopic().equals("TopicTest2")) {
                            // 运行TopicTest2的消费逻辑
                        }
    
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }           
                });
    
                /**
                 * Consumer对象在使用之前必须要调用start初始化。初始化一次就可以<br>
                 */
                consumer.start();                      
    
                logger.info("Consumer Started.");
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    2.5 測试效果

    2.5.1 生产100个消息

    2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C286, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=617]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C31B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=616]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C3B0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=614]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C445, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=614]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C4DA, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=618]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C56F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=617]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C604, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=615]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C699, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=615]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C72E, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=619]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C7C3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=618]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C858, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=616]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C8EF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=616]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C986, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=620]
         2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005CA1D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=619]
         ...

    这里写图片描写叙述

    2.5.2 消费100个消息

    2016-04-07-16-04 [main] [com.autonavi.rocketmq.service.Test] [INFO] - Consumer Started.
         2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_11 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=618, sysFlag=0, bornTimestamp=1460016115897, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115856, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005CDA7, commitLogOffset=380327, bodyCRC=901334138, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0019, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
         2016-04-07-16-04 [ConsumeMessageThread_8] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_8 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=615, sysFlag=0, bornTimestamp=1460016115722, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115680, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C699, commitLogOffset=378521, bodyCRC=260218519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID007, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
         2016-04-07-16-04 [ConsumeMessageThread_9] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_9 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=616, sysFlag=0, bornTimestamp=1460016115773, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115734, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C8EF, commitLogOffset=379119, bodyCRC=996330568, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0011, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
         2016-04-07-16-04 [ConsumeMessageThread_3] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_3 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115669, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115629, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C445, commitLogOffset=377925, bodyCRC=149904014, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID003, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
         2016-04-07-16-04 [ConsumeMessageThread_12] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_12 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=619, sysFlag=0, bornTimestamp=1460016115951, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115911, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005D003, commitLogOffset=380931, bodyCRC=2118254247, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0023, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
         2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ19
         2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_1 and Receive New Messages:[MessageExt [queueId=1, storeSize=149, queueOffset=616, sysFlag=0, bornTimestamp=1460016115635, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115594, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C31B, commitLogOffset=377627, bodyCRC=1726036898, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID001, WAIT=true, MAX_OFFSET=641, MIN_OFFSET=0}, body=12]]]
         2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ1
         2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_18 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=625, sysFlag=0, bornTimestamp=1460016116319, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116278, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005DE2B, commitLogOffset=384555, bodyCRC=796302648, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0047, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
         2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ47
         2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_4 and Receive New Messages:[MessageExt [queueId=2, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115648, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115608, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C3B0, commitLogOffset=377776, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID002, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
         2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ2
         2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_20 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=627, sysFlag=0, bornTimestamp=1460016116436, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116393, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005E2E3, commitLogOffset=385763, bodyCRC=1482935637, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0055, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
         2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ55
         2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_2 and Receive New Messages:[MessageExt [queueId=0, storeSize=149, queueOffset=617, sysFlag=0, bornTimestamp=1460016115587, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115577, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C286, commitLogOffset=377478, bodyCRC=300288820, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID000, WAIT=true, MAX_OFFSET=642, MIN_OFFSET=0}, body=12]]]
         2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ0
         ...

    这里写图片描写叙述

    3 总结

    本文仅供刚開始学习的人学习怎样使用RocketMQ,眼下不过单机配置,还没有涉及到集群等配置,兴许会不断学习和记录。过程中有不正确的地方欢迎吐槽呢。

    4 參考文档

  • 相关阅读:
    计算机硬件内存双通道只显示一根内存条,不能组成双通道
    VMware 虚拟机安装win10操作系统系列问题解决
    编译语言和解释语言区别
    SPSS 24下载与安装+授权码
    Reg文件操作
    DLL注册表文件相关内容
    安装TensorFlow失败
    Anaconda 包管理与环境管理
    Cookie设置域名问题,cookie跨域
    准备篇(二)C语言
  • 原文地址:https://www.cnblogs.com/williamjie/p/9376346.html
Copyright © 2011-2022 走看看