zoukankan      html  css  js  c++  java
  • RocketMQ-广播模式消费

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加

    consumer.setMessageModel(MessageModel.BROADCASTING);

    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class ProducerBroadCast {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("producer_broadcast");
                // 设置重试次数
                producer.setRetryTimesWhenSendFailed(3);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember1 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer1 Started.");
    
        }
    }
    class MqBroadCastListener implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }
    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember2 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer2 Started.");
    
        }
    }

    结果:

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

  • 相关阅读:
    有点忙啊
    什么是协程
    HDU 1110 Equipment Box (判断一个大矩形里面能不能放小矩形)
    HDU 1155 Bungee Jumping(物理题,动能公式,弹性势能公式,重力势能公式)
    HDU 1210 Eddy's 洗牌问题(找规律,数学)
    HDU1214 圆桌会议(找规律,数学)
    HDU1215 七夕节(模拟 数学)
    HDU 1216 Assistance Required(暴力打表)
    HDU 1220 Cube(数学,找规律)
    HDU 1221 Rectangle and Circle(判断圆和矩形是不是相交)
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8215552.html
Copyright © 2011-2022 走看看