zoukankan      html  css  js  c++  java
  • JAVA代码之RocketMQ生产和消费数据

    一、启动RocketMQ

    [root@master ~]# cat /etc/hosts

    # Do not remove the following line, or various programs

    # that require network functionality will fail.

    127.0.0.1               localhost.localdomain localhost

    ::1             localhost6.localdomain6 localhost6

    192.168.1.106  node1

    192.168.1.103  master

    192.168.1.110  node2

    [root@master ~]# cd /opt/alibaba-rocketmq/bin/

    [root@master bin]# cat play.sh 

    #!/bin/sh

    #

    # Name Server

    #

    nohup sh mqnamesrv > ns.log 2>&1 &

    #

    # Service Addr

    #

    ADDR=`hostname -i`:9876

    #

    # Broker

    #

    nohup sh mqbroker -n ${ADDR} > bk.log 2>&1 &

    echo "Start Name Server and Broker Successfully, ${ADDR}"

    [root@master bin]# sh play.sh 

    Start Name Server and Broker Successfully, 192.168.1.103:9876

    [root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876

    BenchmarkTest

    DefaultCluster

    SELF_TEST_TOPIC

    %RETRY%please_rename_unique_group_name_4

    TBW102

    gaojingsong

    master

    OFFSET_MOVED_EVENT

    [root@master bin]# cd ../

    备注:此时topic不存在,但是生产数据的时候会自动创建

    二、生产和消费数据

    生产:下载

    package cn.cn.mq.demo;

    import java.util.concurrent.TimeUnit;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.common.message.Message;

    public class Producer {

       public static void main(String[] args) throws MQClientException,

             InterruptedException{

          /**

           * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>

           * 注意:ProducerGroupName需要由应用来保证唯一<br>

           * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

           * 因为服务器会回查这个Group下的任意一个Producer

           */

          final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

          producer.setNamesrvAddr("192.168.1.103:9876");

          producer.setInstanceName("Producer");

          /**

           * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>

           * 注意:切记不可以在每次发送消息时,都调用start方法

           */

          producer.start();

          /**

           * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

           * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>

           * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>

           * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

           */

          for (int i = 0; i < 3; i++){

             try {

                {

                    Message msg = new Message("TopicTest1",// topic

                          "TagA",// tag

                          "OrderID001",// key

                          ("我的名字是程序员:"+i).getBytes());// body

                    SendResult sendResult = producer.send(msg);

                    System.out.println(sendResult);

                }

                

                {

                    Message msg = new Message("TopicTest1",// topic

                          "TagC",// tag

                          "OrderID001",// key

                          ("我来测试RocketMQ:"+i).getBytes());// body

                    SendResult sendResult = producer.send(msg);

                    System.out.println(sendResult);

                }

             }catch(Exception e) {

                e.printStackTrace();

             }

             TimeUnit.MILLISECONDS.sleep(4000);

          }

          /**

           * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

           * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

           */

    //    producer.shutdown();

          Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

             public void run() {

                producer.shutdown();

             }

          }));

          System.exit(0);

       }

    }

    消费:下载

    package cn.cn.mq.demo;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.message.MessageExt;

    public class PushConsumer {

    /**

    * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>

    * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>

    */

    public static void main(String[] args) throws InterruptedException,

    MQClientException {

    /**

    * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>

    * 注意:ConsumerGroupName需要由应用来保证唯一

    */

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(

    "ConsumerGroupName");

    consumer.setNamesrvAddr("192.168.1.103:9876");

    consumer.setInstanceName("Consumber");

    /**

    * 订阅指定topic下tags分别等于TagA或TagC或TagD

    */

    consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

    /**

    * 订阅指定topic下所有消息<br>

    * 注意:一个consumer对象可以订阅多个topic

    */

    //consumer.subscribe("TopicTest2", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {

    public ConsumeConcurrentlyStatus consumeMessage(

    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

    System.out.println(Thread.currentThread().getName()

    + " Receive New Messages: " + msgs.size());

    MessageExt msg = msgs.get(0);

    if (msg.getTopic().equals("TopicTest1")) {

    // 执行TopicTest1的消费逻辑

    if (msg.getTags() != null // 执行TagA的消费

    && msg.getTags().equals("TagA")) {

    System.out.println("TagA:"+new String(msg.getBody()));

    } else if (msg.getTags() != null// 执行TagC的消费

    && msg.getTags().equals("TagC")) {

    System.out.println("TagC:"+new String(msg.getBody()));

    } else if (msg.getTags() != null// 执行TagD的消费

    && msg.getTags().equals("TagD")) {

    System.out.println("TagD:"+new String(msg.getBody()));

    }

    }  

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

    });

    /**

    * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>

    */

    consumer.start();

    System.out.println("ConsumerStarted.");

    }

    }

    三、验证消费结果  下载

    [root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876

    BenchmarkTest

    TopicTest1

    DefaultCluster

    SELF_TEST_TOPIC

    %RETRY%please_rename_unique_group_name_4

    %RETRY%ConsumerGroupName

    TBW102

    gaojingsong

    master

    OFFSET_MOVED_EVENT

    [root@master bin]# sh mqadmin  topicStatus -n  192.168.1.103:9876 -t TopicTest1

    #Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated

    master                            0     0                     4                       2016-10-20 14:38:19,236

    master                            1     0                     4                       2016-10-20 14:38:19,243

    master                            2     0                     2                       2016-10-20 14:38:15,171

    master                            3     0                     2                       2016-10-20 14:38:15,180

    [root@master bin]# shutdown -h now


     

    消费数据



     

    四、错误解决方案 下载


     -----------------------------------------------------------------------------------------------------------------------------------------------------------

    package com.chetxt.fk.app;
    
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    
    import com.alibaba.rocketmq.client.producer.SendResult;
    
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
    
        public static void main(String[] args) throws MQClientException,
    
                InterruptedException {
    
            /**
             * 
             * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
             * 
             * 注意:ProducerGroupName需要由应用来保证唯一<br>
             * 
             * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
             * 
             * 因为服务器会回查这个Group下的任意一个Producer
             * 
             */
    
            final DefaultMQProducer producer = new DefaultMQProducer("ProducerRealTimeInformation");
    
            producer.setNamesrvAddr("51.100.60.190:9006");
    
            producer.setInstanceName("Producer");
    
            /**
             * 
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             * 
             * 注意:切记不可以在每次发送消息时,都调用start方法
             * 
             */
    
            producer.start();
    
            /**
             * 
             * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
             * 
             * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
             * 
             * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
             * 
             * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
             * 
             */
    
            for (int i = 0; i < 1; i++) {
    
                try {
    
                    {
    
                        Message msg = new Message("Info", // topic
    
                                "1", // tag
    
                                "test", // key
    
                                ("testbody" + i).getBytes());// body
    
                        SendResult sendResult = producer.send(msg);
    
                        System.out.println(sendResult);
    
                    }
    
                    {
    
                        Message msg = new Message("Info", // topic
    
                                "2", // tag
    
                                "tet", // key
    
                                ("tetbody" + i).getBytes());// body
    
                        SendResult sendResult = producer.send(msg);
    
                        System.out.println(sendResult);
    
                    }
    
                } catch (Exception e) {
    
                    e.printStackTrace();
    
                }
    
                TimeUnit.MILLISECONDS.sleep(4000);
    
            }
    
            /**
             * 
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 
             * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
             * 
             */
    
            // producer.shutdown();
    
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    
                public void run() {
    
                    producer.shutdown();
    
                }
    
            }));
    
            System.exit(0);
    
        }
    
    }
    package com.chetxt.fk.app;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class PushConsumer {
    
        /**
         * 
         * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
         * 
         * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
         * 
         */
        public static void main(String[] args) throws InterruptedException,
    
                MQClientException {
    
            /**
             * 
             * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
             * 
             * 注意:ConsumerGroupName需要由应用来保证唯一
             * 
             */
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
    
                    "ConsumerRealTimeInformation");
    
            consumer.setNamesrvAddr("51.140.60.108:9006");
    
            consumer.setInstanceName("Consumber");
    
            /**
             * 
             * 订阅指定topic下tags分别等于TagA或TagC或TagD
             * 
             */
    
            consumer.subscribe("Info", "1 || 2 || 3");
    
            /**
             * 
             * 订阅指定topic下所有消息<br>
             * 
             * 注意:一个consumer对象可以订阅多个topic
             * 
             */
    
            // consumer.subscribe("TopicTest2", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(
    
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    System.out.println(Thread.currentThread().getName()
    
                            + " Receive New Messages: " + msgs.size());
    
                    MessageExt msg = msgs.get(0);
    
                    if (msg.getTopic().equals("Info")) {
    
                        // 执行TopicTest1的消费逻辑
    
                        if (msg.getTags() != null // 执行Tag1的消费
    
                                && msg.getTags().equals("1")) {
    
                            System.out.println("Tag"+msg.getTags()+":" + new String(msg.getBody()));
    
                        } else{
                            System.out.println("Tag"+msg.getTags()+":" + new String(msg.getBody()));
                            
                        }
    //                    else if (msg.getTags() != null// 执行TagC的消费
    //
    //                            && msg.getTags().equals("TagC")) {
    //
    //                        System.out.println("TagC:" + new String(msg.getBody()));
    //
    //                    } else if (msg.getTags() != null// 执行TagD的消费
    //
    //                            && msg.getTags().equals("TagD")) {
    //
    //                        System.out.println("TagD:" + new String(msg.getBody()));
    //
    //                    }
    
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
                }
    
            });
    
            /**
             * 
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             * 
             */
            consumer.start();
    
            System.out.println("ConsumerStarted.");
    
        }
    
    }
  • 相关阅读:
    【转】互联网科技大佬奋斗励志故事
    Java RestTemplate 请求参数字符串中有大括号{}的请求正确方法
    【资料最全】在100以内的所有情况,可以被写作三个数的立方和
    一个例子让你懂java里面的守护线程
    java中finally里面的代码一定会执行吗?(try里面做了return呢?)
    什么是mysql索引下推(有些装B面试官会问)
    java中静态变量指向的对象是在jvm那个区域?用图解告诉你。
    偶然发现在java方法中可以定义类
    Java里面的Comparable接口
    leetcode面试题 17.14. 最小K个数(快速排序,只排序一边)
  • 原文地址:https://www.cnblogs.com/shamo89/p/9662401.html
Copyright © 2011-2022 走看看