zoukankan      html  css  js  c++  java
  • 《RocketMQ 安装和使用》

    安装Maven

    安装步骤:《Maven的安装、配置及使用入门》

      http://www.cnblogs.com/dcba1112/archive/2011/05/01/2033805.html

      http://maven.apache.org/download.cgi (apache-maven-3.3.3-bin.zip)

      Path环境变量,当我们在cmd中输入命令时,Windows首先会在当前目录中寻找可执行文件或脚本,如果没有找到,Windows会接着遍历环境变量Path中定义的路径。由于我们将%M2_HOME%in添加到了Path中,而这里%M2_HOME%实际上是引用了我们前面定义的另一个变量,其值是Maven的安装目录。因此,Windows会在执行命令时搜索目录D:inapache-maven-3.0in,而mvn执行脚本的位置就是这里。

    安装RocketMQ

    源码地址:(2015-07-18 3.2.6版本)

      E:RocketMQRocketMQ-master

      E:RocketMQRocketMQ_Workspace

      E:RocketMQRocketMQ-master argetalibaba-rocketmq-3.2.6-alibaba-rocketmqalibaba-rocketmqin

    执行下边的命令或者执行install.bat(在这个bat文件中的命令如下)对maven熟悉的一眼就知道是执行clean package install assembly等操作。

      mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U

    将RocketMQ-master导入到eclipse中

        

      进入刚生成的target文件夹下的bin目录,在命令行中执行:start mqnamesrv.exe,会弹出一个信息窗口,显示The name Server boot success 说明启动成功了,接着启动borker,在命令行中执行:start mqbroker.exe -n 127.0.0.1:9876 同样的弹出一个窗口,看到success表示成功了。

      start mqnamesrv >E:RocketMQlogsalibaba-rocketmq/mqnamesrv.log

    【遇到问题】无法启动mqnamesrv。显示软件不兼容。

      

    【如何在windows下使用linux的shell脚本】
      http://www.cygwin.com/。

      E:cygwin64。

    【当前目录调出CMD】

      在桌面上先按住Shift键,然后鼠标右键,出现选项“在此处打开命令窗口(W)”也可以打开命令行。

    生产者

     1 public class Producer{
     2     public static void main(String[] args) throws MQClientException,InterruptedException{
     3         //一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
     4         final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
     5         producer.setNamesevAddr("127.0.0.1:9876");
     6         producer.setInstanceName("Producer");
     7         
     8         producer.start();
     9         
    10         //一个Producer对象可以发送多个topic,多个tag的消息
    11         for(int i=0; i<10; i++){
    12             try{
    13                 {
    14                     Message msg = new Message("TopicTest1","TagA","OrderID001",("Hello MetaQ").getBytes());
    15                     SendResult sendResult = pro .send(msg);
    16                     System.out.println(sendResult);
    17                 }
    18                 {
    19                     Message msg = new Message("TopicTest2", "TagB","OrderID0034",("Hello MetaQB").getBytes());
    20                     SendResult sendResult = producer.send(msg);    
    21                     System.out.println(sendResult);  
    22                 }
    23                 {    
    24                     Message msg = new Message("TopicTest3",// topic    
    25                         "TagC",// tag    
    26                         "OrderID061",// key    
    27                         ("Hello MetaQC").getBytes());// body    
    28                     SendResult sendResult = producer.send(msg);    
    29                     System.out.println(sendResult);    
    30                 }   
    31             }catch(Exception e){
    32                 e.printStackTrace();
    33             }
    34             TimeUnit.MILLSECONDS.sleep(1000);
    35         }
    36         
    37       /**  
    38    * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己  
    39    * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法  
    40    */    
    41 //producer.shutdown();    
    42   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {    
    43      public void run() {    
    44         producer.shutdown();    
    45      }    
    46   }));    
    47   System.exit(0);      
    48     }
    49 
    50 }

    消费者

     1 public class Consumer {    
     2      /**   
     3      * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>   
     4      * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>   
     5      */      
     6     public static void main(String[] args) throws InterruptedException,      
     7                        MQClientException{      
     8               /**   
     9                * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>   
    10                * 注意:ConsumerGroupName需要由应用来保证唯一   
    11                */      
    12               DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(      
    13                                 "ConsumerGroupName");      
    14               consumer.setNamesrvAddr("127.0.0.1:9876");      
    15               consumer.setInstanceName("Consumber");      
    16     
    17               /**   
    18                * 订阅指定topic下tags分别等于TagA或TagC或TagD   
    19                */      
    20               consumer.subscribe("TopicTest1","TagA || TagC || TagD");      
    21               /**   
    22                * 订阅指定topic下所有消息<br>   
    23                * 注意:一个consumer对象可以订阅多个topic   
    24                */      
    25               consumer.subscribe("TopicTest2","*");      
    26     
    27               consumer.registerMessageListener(new MessageListenerConcurrently() {      
    28     
    29                        public ConsumeConcurrentlyStatus consumeMessage(      
    30                                           List<MessageExt>msgs, ConsumeConcurrentlyContext context) {      
    31     
    32                                 System.out.println(Thread.currentThread().getName()      
    33                                                    +" Receive New Messages: " + msgs.size());      
    34     
    35                                 MessageExt msg = msgs.get(0);      
    36                                 if(msg.getTopic().equals("TopicTest1")) {      
    37                                           //执行TopicTest1的消费逻辑      
    38                                           if(msg.getTags() != null && msg.getTags().equals("TagA")) {      
    39                                                    //执行TagA的消费      
    40                                                    System.out.println(new String(msg.getBody()));      
    41                                           }else if (msg.getTags() != null      
    42                                                             &&msg.getTags().equals("TagC")) {      
    43                                                    //执行TagC的消费      
    44                                                    System.out.println(new String(msg.getBody()));      
    45                                           }else if (msg.getTags() != null      
    46                                                             &&msg.getTags().equals("TagD")) {      
    47                                                    //执行TagD的消费      
    48                                                    System.out.println(new String(msg.getBody()));      
    49                                           }      
    50                                 }else if (msg.getTopic().equals("TopicTest2")) {      
    51                                           System.out.println(new String(msg.getBody()));      
    52                                 }      
    53     
    54                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      
    55     
    56                        }      
    57               });      
    58     
    59               /**   
    60                * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>   
    61                */      
    62               consumer.start();      
    63     
    64               System.out.println("ConsumerStarted.");      
    65     }      
    66 }    

     

    参考文章

    l  《RocketMQ在windows上安装和开发使用》http://blog.csdn.net/ruishenh/article/details/22390809

    l  《RocketMQ在Windows平台下环境搭建》http://www.cnblogs.com/marcotan/p/4256859.html

    l  《RocketMQ随笔分类》http://www.cnblogs.com/marcotan/category/655319.html

  • 相关阅读:
    数据库模式
    数据流模式、转换、格式与操作
    桥接模式=抽象层协作关系+继承体系注入
    php 中更简洁的三元运算符 ?:
    larave5.6 将Excel文件数据导入数据库代码实例
    Laravel获取所有的数据库表及结构
    Laravel框架数据库CURD操作、连贯操作总结
    insert into 语句的三种写法
    Laravel 上传excel,读取并写入数据库 (实现自动建表、存记录值
    laravel5.4将excel表格中的信息导入到数据库中
  • 原文地址:https://www.cnblogs.com/lsx1993/p/4656351.html
Copyright © 2011-2022 走看看