在前面学习ActiveMQ时,看到ActiveMQ可以是队列消息模式,也可以是订阅发布模式。
同样,在RocketMQ中,也存在两种消息模式,即是集群消费模式和广播消费模式。
1. 集群消费模式
跟AciiveMQ一样,当存在多个消费者时,消息通过一定负载均衡策略,将消息分发到多个consumer中。
如图:
在RockeMQ中,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!
消息分发也是有多个策略可以配置的,配置方式如下:
可以使用setAllocateMessageQueueStrategy()方法传入如下参数实现不同的负载均衡策略,默认AllocateMessageQueueAveragely,轮询算法策略。
RocketMQ默认的消息模式就是集群模式。
开启两个不同的consumer,控制台打印结果如下:
consumer1
收到来自topic: MyTopic,的消息:2 收到来自topic: MyTopic,的消息:3 收到来自topic: MyTopic,的消息:4 收到来自topic: MyTopic,的消息:5
consumer2:
收到来自topic: MyTopic,的消息:0 收到来自topic: MyTopic,的消息:1 收到来自topic: MyTopic,的消息:6 收到来自topic: MyTopic,的消息:7 收到来自topic: MyTopic,的消息:8 收到来自topic: MyTopic,的消息:9
可以看出消息是被分发给两个消费者的,可以通过consumer.setMessageModel(MessageModel.CLUSTERING);设置集群消费策略。
2. 广播模式
广播模式跟ActiveMQ的发布订阅一样,即是将所有消息分发给Consume Group中每个消费者消费。
代码实现如下:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup"); consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("MyTopic", "*"); //设置消费模式为广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt ext = msgs.get(0); String topic = ext.getTopic(); String body = new String(ext.getBody(),"utf-8"); System.out.println("收到来自topic: " + topic + ",的消息:" + body); } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
启动两个消费者,每个消费者都能接收到所有消息,控制台打印如下:
收到来自topic: MyTopic,的消息:0 收到来自topic: MyTopic,的消息:1 收到来自topic: MyTopic,的消息:2 收到来自topic: MyTopic,的消息:3 收到来自topic: MyTopic,的消息:4 收到来自topic: MyTopic,的消息:5 收到来自topic: MyTopic,的消息:6 收到来自topic: MyTopic,的消息:7 收到来自topic: MyTopic,的消息:8 收到来自topic: MyTopic,的消息:9