根据使用者对读取操作的控制情况,消费者可分为两种类型。一个是 DefaultMQPushConsumer ,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;另一个是DefaultMQPullConsumer ,读取操作中的大部分功 能由使用者自主控制。
public class QuickStart { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("chapter3PushConsumer"); consumer.setNamesrvAddr("192.168.1.3:9876;192.168.1.10:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + "Receive New Messages :" + msgs + " %n "); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
DefaultMQPushConsumer 需要设置三个参数: 一是这个Consumer 的 GroupName ,二是NameServer 的地址和端口号,三是Topic 的名称, 下面将分 别进行详细介绍。 1 ) Consumer 的GroupName 用于把多个Consumer 组织到一起, 提高并发 处理能力, GroupName 需要和消息模式( MessageModel )配合使用。 RocketMQ 支持两种消息模式: Clustering 和Broadcasting 。 口在Clustering 模式下,同一个ConsumerGroup ( GroupN ame 相同) 里的每 个Consumer 只消费所订阅消息的一部分内容, 同一个ConsumerGroup 里所有的Consumer 消费的内容合起来才是所订阅Topic 内容的整体, 从而达到负载均衡的目的。 口在Broadcasting 模式下,同一个ConsumerGroup 里的每个Consumer 都 能消费到所订阅Topic 的全部消息,也就是一个消息会被多次分发,被 多个Consumer 消费。 2) NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除 单点故障的目的, 比如“ ip1 :port;ip2:port;ip3 :port ” 。 3 ) Topic 名称用来标识消息类型, 需要提前创建。如果不需要消费某 个Topic 下的所有消息,可以通过指定消息的Tag 进行消息过滤,比如: Consumer. subscribe (”Topic Test”J ’tagl 11 tag2 11 tag3 ”), 表示这个Consumer 要 消费“ TopicTest ”下带有tagl 或tag2 或tag3 的消息( Tag 是在发送消息时设 置的标签) 。在填写Tag 参数的位置,用null 或者“ V 表示要消费这个Topic 的所有消息。
这章的重点是
1、两种消费方式,
2、Clustering 模式和Broadcasting模式
3、同步发送和异步发送
4、group的概念
5、offset的含义和使用,这个不是很清楚