zoukankan      html  css  js  c++  java
  • (转)阿里RocketMQ Quick Start

    转:http://blog.csdn.net/a19881029/article/details/34446629

    RocketMQ单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间,过期数据数据删除(RocketMQ中的消息队列长度不是无限的,只是足够大的内存+数据定时删除)

    RocketMQ版本:3.1.4

     

    一,部署NameServer:

    1,安装JDK并设置JAVA_HOME环境变量(启动脚本依赖JAVA_HOME环境变量)

    2,cd /alibaba-rocketmq/bin进入RocketMQ的bin目录

    2,调用nohup sh mqnamesrv &启动NameServer

    报错如下:

    [plain] view plain copy
     
    1. : command not found  
    2. : command not found  
    3. mqnamesrv: line 35: syntax error: unexpected end of file  

    在bin目录下调用dos2unix *将所有文件转化为unix格式,再次调用nohup sh mqnamesrv &

    报错如下:

    [plain] view plain copy
     
    1. /home/hadoop/alibaba-rocketmq  
    2. Invalid initial heap size: -Xms4g  
    3. The specified size exceeds the maximum representable size.  
    4. Could not create the Java virtual machine.  

    由于安装的JDK版本为32位,4g超过了JDK所支持的最大内存,不过32位JDK也无法发挥出RocketMQ的优势,换成64位JDK

    这次启动成功

    [plain] view plain copy
     
    1. [hadoop@hadoop bin]$ nohup sh mqnamesrv &  
    2. [1] 17676  
    3. [hadoop@hadoop bin]$ nohup: appending output to “nohup.out”  
    4.   
    5. [hadoop@hadoop bin]$ cat nohup.out   
    6. The Name Server boot success.  
    7. [hadoop@hadoop bin]$ jps  
    8. 17682 NamesrvStartup  
    9. 17800 Jps  

    NameServer监听端口:9876

    [java] view plain copy
     
    1. nettyServerConfig.setListenPort(9876);  

    如果服务器内存不够,可以修改runserver.sh脚本(mqnamesrv文件中通过runserver.sh脚本调用Name Server的主函数com.alibaba.rocketmq.namesrv.NamesrvStartup启动Name Server)中的JAVA_OPT_1参数

    [plain] view plain copy
     
    1. JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

    二,部署Broker:消息中转角色,负责存储消息,转发消息

    Broker集群有多种配置方式:

    1,单Master

        优点:除了配置简单没什么优点

        缺点:不可靠,该机器重启或宕机,将导致整个服务不可用

    2,多Master

        优点:配置简单,性能最高

        缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性

    3,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级

        优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预

        缺点:Master宕机或磁盘损坏时会有少量消息丢失

    4,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功

        优点:服务可用性与数据可用性非常高

        缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主

    Master和Slave的配置文件参考conf目录下的配置文件

    Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

    一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

    部署一Master一Slave,集群采用异步复制方式:

    Master:

    [plain] view plain copy
     
    1. [hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c ../conf/2m-2s-async/broker-a.properties &  
    2. [2] 25493  
    3. [hadoop@hadoop bin]$ nohup: appending output to “nohup.out”  
    4.   
    5. [hadoop@hadoop bin]$ cat nohup.out   
    6. load config properties file OK, ../conf/2m-2s-async/broker-a.properties  
    7. The broker[broker-a, 192.168.58.163:10911] boot success. and name server is 192.168.58.163:9876  
    8. [hadoop@hadoop bin]$ jps  
    9. 25500 BrokerStartup  
    10. 25545 Jps  
    11. 17682 NamesrvStartup  

    Slave:

    [plain] view plain copy
     
    1. [hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  
    2. [1] 1974  
    3. [hadoop@hadoop bin]$ nohup: appending output to “nohup.out”  
    4.   
    5. [hadoop@hadoop bin]$ cat nohup.out   
    6. load config properties file OK, ../conf/2m-2s-async/broker-a-s.properties  
    7. The broker[broker-a, 192.168.58.164:10911] boot success. and name server is 192.168.58.163:9876  
    8. [hadoop@hadoop bin]$ jps  
    9. 2071 Jps  
    10. 1981 BrokerStartup  

    Broker监听端口:10911

    [java] view plain copy
     
    1. nettyServerConfig.setListenPort(10911);  

    如果服务器内存不够,可以修改runbroker.sh脚本(mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker)的JAVA_OPT_1参数

    [plain] view plain copy
     
    1. JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  


    三,Producer

    必须要设置Name Server地址

    [java] view plain copy
     
    1. package com.sean;  
    2.   
    3. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
    4. import com.alibaba.rocketmq.client.producer.SendResult;  
    5. import com.alibaba.rocketmq.common.message.Message;  
    6.   
    7. public class Producer {  
    8.     public static void main(String[] args){  
    9.         DefaultMQProducer producer = new DefaultMQProducer("Producer");  
    10.         producer.setNamesrvAddr("192.168.58.163:9876");   
    11.         try {  
    12.             producer.start();  
    13.               
    14.             Message msg = new Message("PushTopic",   
    15.                     "push",   
    16.                     "1",   
    17.                     "Just for test.".getBytes());  
    18.               
    19.             SendResult result = producer.send(msg);  
    20.             System.out.println("id:" + result.getMsgId() +  
    21.                     " result:" + result.getSendStatus());  
    22.               
    23.             msg = new Message("PushTopic",   
    24.                     "push",   
    25.                     "2",   
    26.                     "Just for test.".getBytes());  
    27.               
    28.             result = producer.send(msg);  
    29.             System.out.println("id:" + result.getMsgId() +  
    30.                     " result:" + result.getSendStatus());  
    31.               
    32.             msg = new Message("PullTopic",   
    33.                     "pull",   
    34.                     "1",   
    35.                     "Just for test.".getBytes());  
    36.               
    37.             result = producer.send(msg);  
    38.             System.out.println("id:" + result.getMsgId() +  
    39.                     " result:" + result.getSendStatus());  
    40.         } catch (Exception e) {  
    41.             e.printStackTrace();  
    42.         }finally{  
    43.             producer.shutdown();  
    44.         }  
    45.     }  
    46. }  

    四,Consumer

    必须要设置Name Server地址

    [java] view plain copy
     
    1. package com.sean;  
    2.   
    3. import java.util.List;  
    4.   
    5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
    6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
    7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
    8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
    9. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
    10. import com.alibaba.rocketmq.common.message.Message;  
    11. import com.alibaba.rocketmq.common.message.MessageExt;  
    12.   
    13. public class Consumer {  
    14.     public static void main(String[] args){  
    15.         DefaultMQPushConsumer consumer =   
    16.                 new DefaultMQPushConsumer("PushConsumer");  
    17.         consumer.setNamesrvAddr("192.168.58.163:9876");   
    18.         try {  
    19.             //订阅PushTopic下Tag为push的消息  
    20.             consumer.subscribe("PushTopic", "push");  
    21.             //程序第一次启动从消息队列头取数据  
    22.             consumer.setConsumeFromWhere(  
    23.                     ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    24.             consumer.registerMessageListener(  
    25.                 new MessageListenerConcurrently() {  
    26.                     public ConsumeConcurrentlyStatus consumeMessage(  
    27.                             List<MessageExt> list,  
    28.                             ConsumeConcurrentlyContext Context) {  
    29.                         Message msg = list.get(0);  
    30.                         System.out.println(msg.toString());  
    31.                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    32.                     }  
    33.                 }  
    34.             );  
    35.             consumer.start();  
    36.         } catch (Exception e) {  
    37.             e.printStackTrace();  
    38.         }  
    39.     }  
    40. }  

    先运行Consumer,然后运行Producer

    Producer运行结果:

    [plain] view plain copy
     
    1. id:C0A83AA300002A9F00000000000009EA result:SEND_OK  
    2. id:C0A83AA300002A9F0000000000000A77 result:SEND_OK  
    3. id:C0A83AA300002A9F0000000000000B04 result:SEND_OK  

    Consumer运行结果:

    [plain] view plain copy
     
    1. MessageExt [queueId=1, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668792, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527374, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F0000000000000A77, commitLogOffset=2679, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=2, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]  
    2. MessageExt [queueId=0, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668698, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527356, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F00000000000009EA, commitLogOffset=2538, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=1, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]  
  • 相关阅读:
    Web前端开发——HTML概述
    Web前端开发——概述
    [nowCoder] 两个不等长数组求第K大数
    [nowCoder] 两个长度相同有序数组的中位数
    [nowCoder] 完全二叉树结点数
    [nowCoder] 子数组最大乘积
    [nowCoder] 局部最小值位置
    [LeetCode] Binary Tree Right Side View
    [C/CPP系列知识] Type difference of character literals 和 bool in C and C++
    [C/CPP系列知识] C++中extern “C” name mangling -- Name Mangling and extern “C” in C++
  • 原文地址:https://www.cnblogs.com/wangle1001986/p/6903513.html
Copyright © 2011-2022 走看看