zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(15)----RocketMQ的消息模式

    在前面学习ActiveMQ时,看到ActiveMQ可以是队列消息模式,也可以是订阅发布模式。

      同样,在RocketMQ中,也存在两种消息模式,即是集群消费模式和广播消费模式。

    1. 集群消费模式

      跟AciiveMQ一样,当存在多个消费者时,消息通过一定负载均衡策略,将消息分发到多个consumer中。

      如图:

      

      在RockeMQ中,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!

      消息分发也是有多个策略可以配置的,配置方式如下:

      可以使用setAllocateMessageQueueStrategy()方法传入如下参数实现不同的负载均衡策略,默认AllocateMessageQueueAveragely,轮询算法策略。

      

      RocketMQ默认的消息模式就是集群模式。

      开启两个不同的consumer,控制台打印结果如下:

      consumer1

    收到来自topic: MyTopic,的消息:2
    收到来自topic: MyTopic,的消息:3
    收到来自topic: MyTopic,的消息:4
    收到来自topic: MyTopic,的消息:5

      consumer2:

    收到来自topic: MyTopic,的消息:0
    收到来自topic: MyTopic,的消息:1
    收到来自topic: MyTopic,的消息:6
    收到来自topic: MyTopic,的消息:7
    收到来自topic: MyTopic,的消息:8
    收到来自topic: MyTopic,的消息:9

      可以看出消息是被分发给两个消费者的,可以通过consumer.setMessageModel(MessageModel.CLUSTERING);设置集群消费策略。

    2. 广播模式

      广播模式跟ActiveMQ的发布订阅一样,即是将所有消息分发给Consume Group中每个消费者消费。

      代码实现如下:

    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup");
    
            consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.subscribe("MyTopic", "*");
            //设置消费模式为广播模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        MessageExt ext = msgs.get(0);
                        String topic = ext.getTopic();
                        String body = new String(ext.getBody(),"utf-8");
                        System.out.println("收到来自topic: " + topic + ",的消息:" + body);
                    } catch (Exception e) {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
    
            });
            consumer.start();
        }
    }

      启动两个消费者,每个消费者都能接收到所有消息,控制台打印如下:

    收到来自topic: MyTopic,的消息:0
    收到来自topic: MyTopic,的消息:1
    收到来自topic: MyTopic,的消息:2
    收到来自topic: MyTopic,的消息:3
    收到来自topic: MyTopic,的消息:4
    收到来自topic: MyTopic,的消息:5
    收到来自topic: MyTopic,的消息:6
    收到来自topic: MyTopic,的消息:7
    收到来自topic: MyTopic,的消息:8
    收到来自topic: MyTopic,的消息:9

    原文 RocketMQ学习笔记(15)----RocketMQ的消息模式

  • 相关阅读:
    正则表达式语法
    Linux之Shell脚本计算命令行的所有和
    Linux之匹配符
    Linux之ls命令
    Linux之Shell的算术运算
    Linux 之 shell 比较运算符
    tensorboard的使用
    模型训练减少随机性
    keras 下载预训练模型报错SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:852)
    Deep face recognition: a survey v4
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10867628.html
Copyright © 2011-2022 走看看