zoukankan      html  css  js  c++  java
  • RocketMQ-广播模式消费

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加

    consumer.setMessageModel(MessageModel.BROADCASTING);

    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class ProducerBroadCast {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("producer_broadcast");
                // 设置重试次数
                producer.setRetryTimesWhenSendFailed(3);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember1 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer1 Started.");
    
        }
    }
    class MqBroadCastListener implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }
    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember2 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer2 Started.");
    
        }
    }

    结果:

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

  • 相关阅读:
    【转】SVN与Git比较
    我遇到了Hibernate异常
    使用 Eclipse 远程调试 Java 应用程序
    linux显示桌面快捷键设置
    Ubuntu共享WiFi(AP)给Android方法
    用zd1211+Ubuntu 10.04实现的AP
    Ubuntu开机自动禁用无线网络
    戴尔大力宣传Ubuntu 对比与Windows的差异
    【SSH进阶之路】Spring的AOP逐层深入——采用注解完成AOP(七)
    【SSH进阶之路】Spring的AOP逐层深入——AOP的基本原理(六)
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8215552.html
Copyright © 2011-2022 走看看