zoukankan      html  css  js  c++  java
  • RocketMQ-广播模式消费

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加

    consumer.setMessageModel(MessageModel.BROADCASTING);

    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class ProducerBroadCast {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("producer_broadcast");
                // 设置重试次数
                producer.setRetryTimesWhenSendFailed(3);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember1 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer1 Started.");
    
        }
    }
    class MqBroadCastListener implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }
    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember2 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer2 Started.");
    
        }
    }

    结果:

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

  • 相关阅读:
    HDU 1863 畅通工程(Kruskal)
    HDU 1879 继续畅通工程(Kruskra)
    HDU 1102 Constructing Roads(Kruskal)
    POJ 3150 Cellular Automaton(矩阵快速幂)
    POJ 3070 Fibonacci(矩阵快速幂)
    ZOJ 1648 Circuit Board(计算几何)
    ZOJ 3498 Javabeans
    ZOJ 3490 String Successor(模拟)
    Java实现 LeetCode 749 隔离病毒(DFS嵌套)
    Java实现 LeetCode 749 隔离病毒(DFS嵌套)
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8215552.html
Copyright © 2011-2022 走看看