zoukankan      html  css  js  c++  java
  • RocketMQ-quickstart(批量消费问题)

    基本概念:

      Producer:消息生产者,负责生产消息,一般由业务系统负责生产消息。

      Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

      Push Consumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Linsener接口方法

      Pull Consumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

      Consumer Group:一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

      Broker:消息中转角色,负责存储消息,转发消息,一般也称为Server,在JMS规范中成为Provider

      Topic: 一个Topic有四个Queue

    代码示例:

     生产者:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
    //        producer.setNamesrvAddr("192.168.31.176:9876");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("quick_start_producer");
                // 设置重试次数
                producer.setRetryTimesWhenSendFailed(3);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes());
                //目前发现3.2.6版本没有这个方法,3.5.3版本有这个方法,并且必须要设置为false否则会报错
    //            producer.setVipChannelEnabled(false);
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                //发送,并触发回调函数
                /*producer.send(msg, new SendCallback(){
                    //成功回调函数
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult.getSendStatus());
                        System.out.println("成功");
                    }
                    //异常回调函数
                    @Override
                    public void onException(Throwable e) {
                        System.out.println("失败了" +  e.getMessage());
                    }
                });*/
                
                //获取某个主题的消息队列  
                /*List<MessageQueue> messageQueues = producer  
                        .fetchPublishMessageQueues("PushTopic");  
                System.out.println(messageQueues.size());  */
                
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

     消费者:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.sun.org.apache.xpath.internal.SourceTree;
    
    public class Consumer {
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            //批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            try {
    //            consumer.setInstanceName("quick_start_consumer");
                //3.2.6这个版本没有这个方法,3.5.3版本要设置这个方法为false,否则取不到topic
    //            consumer.setVipChannelEnabled(false);
    
                //程序第一次启动从消息队列头取数据
                //如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("PushTopic_tt1", "*");
    //            consumer.subscribe("PushTopic_tt1", "TagA || Tag B || Tage C");
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        for(MessageExt msg : msgs) {
                            System.out.println("-------->" + msg.getKeys());
                            System.out.println("-------->" + msg.getMsgId());
                            System.out.println("-------->" + msg.getQueueId());
                            System.out.println("-------->" + msg.getQueueOffset());
                            System.out.println("-------->" + msg.getBody().toString());
                            System.out.println("-------->" + msg.toString());
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                consumer.start();
                System.out.println("Consumer Started."); 
    //            consumer.suspend();
                
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            
            
        }
    }

     注意:

      在Consumer端,我们用的是DefaultMQPushConsumer这个类,

      所以我们可以设置批量消费,

      但是,List<MessageExt> msgs这里还是只消费一条,所以这段代码的for循环会产生误导,直接写成MessageExt msg = msgs.get()就可以

       那不是说consumer.setConsumeMessageBatchMaxSize(10);不就是没用了么?其实是这样的,我们正常的流程一般都是先启动Consumer端,然后再启动Producer端。Consumer端都是一条一条的消费的。但是有时候会出现先启动Producer端,然后再启动Consumer端这种情况,那这个时候就是会批量消费了,这个参数就会有作用了。

     https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

    问题:

    [1] producer生产消息报 “No route info of this topic” 异常。

    解决方案:

    阿里的github issues : https://github.com/alibaba/RocketMQ/issues/44

    网上参考,把rocketmq的配置文件broker-a.properties中的autoCreateTopicEnable值改成true

  • 相关阅读:
    延时函数出错,volatile一例
    【转】STM32中的抢占优先级、响应优先级概念
    【转载】串口中怎样接收一个完整数据包的解析
    ARM-ContexM3/4组优先级和子优先级抢占规则
    【转载】Keil中的USE MicroLib说明
    线程让出实验【RT-Thread学习笔记 4】
    线程优先级抢占实验【RT-Thread学习笔记 3】
    RT-Thread的线程(任务)处理【RT-Thread学习笔记 2】
    熟悉RT-Thread的软硬件环境【RT-Thread学习笔记 1】
    RT-Thread下的串口驱动程序分析【转载】
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8169072.html
Copyright © 2011-2022 走看看