zoukankan      html  css  js  c++  java
  • RocketMQ-广播模式消费

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加

    consumer.setMessageModel(MessageModel.BROADCASTING);

    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

    package org.hope.lee.producer;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendCallback;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class ProducerBroadCast {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
            producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            try {
                // 设置实例名称
                producer.setInstanceName("producer_broadcast");
                // 设置重试次数
                producer.setRetryTimesWhenSendFailed(3);
                // 开启生产者
                producer.start();
                // 创建一条消息
                Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
                SendResult send = producer.send(msg);
                System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
                
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
            producer.shutdown();
        }
    }

    Consumer端:

    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember1 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer1 Started.");
    
        }
    }
    class MqBroadCastListener implements MessageListenerConcurrently{
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                MessageExt msg = msgs.get(0);
                String msgBody = new String(msg.getBody(), "utf-8");
                System.out.println("msgBody:" + msgBody);
            } catch(Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        
    }
    package org.hope.lee.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class ConsumerBroadCastMember2 {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
            consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
            // 批量消费,每次拉取10条
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置广播消费
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //设置集群消费
    //        consumer.setMessageModel(MessageModel.CLUSTERING);
            // 如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 订阅PushTopic下Tag为push的消息
            consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MqBroadCastListener());
            consumer.start();
            System.out.println("Consumer2 Started.");
    
        }
    }

    结果:

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

  • 相关阅读:
    题解 CF432D 【Prefixes and Suffixes】
    题解 P3199 【[HNOI2009]最小圈】
    题解 AT1221 【水筒】
    题解 P4381 【[IOI2008]Island】
    题解 CF1242B 【0-1 MST】
    《数据库系统概论》 -- 3.1 SQL概论和DDL
    《数据库系统概论》 -- 2. 关系操作
    《数据库系统概论》 -- 1. 绪论
    配置vim成为IDE
    ubuntu--软件管理
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8215552.html
Copyright © 2011-2022 走看看