zoukankan      html  css  js  c++  java
  • 详解RocketMQ中的consumer

    上述就是MQ中有关Consumer的类图,下面来介绍一下每个类

     1.MQAdmin:底层类,上篇博客已经提过,就不再此重提

     2.MQConsumer:Consumer公共的接口,常用的方法如下

     如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间

       void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

     3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

    4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

     在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下

     PushConsumer:通过注册监听的方式来消费信息

    [java] view plain copy
     
     print?
    1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
    2.  * @FileName: Consumer.java    
    3.  * @Package:com.test    
    4.  * @Description: TODO   
    5.  * @author: LUCKY     
    6.  * @date:2015年12月28日 下午2:43:23    
    7.  * @version V1.0      
    8.  */  
    9. package com.test;  
    10.   
    11. import java.util.List;  
    12.   
    13. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
    14. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
    15. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
    16. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
    17. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
    18. import com.alibaba.rocketmq.common.message.Message;  
    19. import com.alibaba.rocketmq.common.message.MessageExt;  
    20.   
    21. /** 
    22.  * @ClassName: Consumer 
    23.  * @Description: 模拟消费者 
    24.  * @author: LUCKY 
    25.  * @date:2015年12月28日 下午2:43:23 
    26.  */  
    27. public class ConsumerTest {  
    28.   
    29.     public static void main(String[] args) {  
    30.         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
    31.         consumer.setNamesrvAddr("100.66.154.81:9876");  
    32.         try {  
    33.               
    34.             // 订阅PushTopic下Tag为push的消息,都订阅消息  
    35.             consumer.subscribe("PushTopic", "push");  
    36.               
    37.             // 程序第一次启动从消息队列头获取数据  
    38.             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    39.             //可以修改每次消费消息的数量,默认设置是每次消费一条  
    40.             // consumer.setConsumeMessageBatchMaxSize(10);  
    41.   
    42.             //注册消费的监听  
    43.             consumer.registerMessageListener(new MessageListenerConcurrently() {  
    44.                //在此监听中消费信息,并返回消费的状态信息  
    45.                 public ConsumeConcurrentlyStatus consumeMessage(  
    46.                         List<MessageExt> msgs,  
    47.                         ConsumeConcurrentlyContext context) {  
    48.                       
    49.                     // msgs中只收集同一个topic,同一个tag,并且key相同的message  
    50.                     // 会把不同的消息分别放置到不同的队列中  
    51.                     for(Message msg:msgs){  
    52.               
    53.                         System.out.println(new String(msg.getBody()));  
    54.                     }     
    55.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    56.                 }  
    57.             });  
    58.   
    59.             consumer.start();  
    60.             Thread.sleep(5000);  
    61.             //5秒后挂载消费端消费  
    62.             consumer.suspend();  
    63.               
    64.         } catch (Exception e) {  
    65.             e.printStackTrace();  
    66.         }  
    67.     }  
    68. }  
    69. </span>  

     PullConsumer:通过拉去的方式来消费消息

    [java] view plain copy
     
     print?
    1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
    2.  * @FileName: Consumer.java    
    3.  * @Package:com.test    
    4.  * @Description: TODO   
    5.  * @author: LUCKY     
    6.  * @date:2015年12月28日 下午2:43:23    
    7.  * @version V1.0      
    8.  */  
    9. package com.test;  
    10.   
    11. import java.util.Set;  
    12.   
    13. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
    14. import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
    15. import com.alibaba.rocketmq.common.message.MessageQueue;  
    16.   
    17. /** 
    18.  * @ClassName: Consumer 
    19.  * @Description: 模拟消费者 
    20.  * @author: LUCKY 
    21.  * @date:2015年12月28日 下午2:43:23 
    22.  */  
    23. public class ConsumerPullTest {  
    24.   
    25.     public static void main(String[] args) {  
    26.         DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
    27.         consumer.setNamesrvAddr("100.66.154.81:9876");  
    28.        consumer.setConsumerGroup("broker");  
    29.         try {  
    30.             consumer.start();  
    31.         Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");        
    32.   
    33.         for(MessageQueue messageQueue:messageQueues){  
    34.           
    35.             System.out.println(messageQueue.getTopic());  
    36.         }  
    37.           
    38.           
    39.         //消息队列的监听  
    40.         consumer.registerMessageQueueListener("", new MessageQueueListener() {  
    41.               
    42.             @Override  
    43.             //消息队列有改变,就会触发  
    44.             public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,  
    45.                     Set<MessageQueue> mqDivided) {  
    46.                 // TODO Auto-generated method stub  
    47.                   
    48.             }  
    49.         });  
    50.               
    51.       
    52.         } catch (Exception e) {  
    53.             e.printStackTrace();  
    54.         }  
    55.     }  
    56. }  
    57. </span>  


    一般在应用中都会采用push的方法来自动的消费信息

  • 相关阅读:
    Python运算符,基本数据类型
    Python2 错误记录1File "<string>", line 1, in <module> NameError: name 'f' is not defined
    用户登录三次练习
    跟我一起学Python-day1(条件语句以及初识变量)
    vim operation
    步步为营-28-事件本质
    步步为营-27-事件
    步步为营-26-多播委托
    步步为营-25-委托(比大小)
    步步为营-24-委托
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/5881043.html
Copyright © 2011-2022 走看看