双Master方式:
服务器环境
序号 | IP | 角色 | 模式 |
1 | 192.168.32.135 | nameServer1,brokerServer1 | Master1 |
2 | 192.168.32.136 | nameServer2,brokerServer2 | Master2 |
Helloworld代码示例:
/** * Producer,发送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer"); producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } /** * Consumer,订阅消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); try { for(MessageExt msg : msgs){ String topic = msg.getTopic(); String msgBody = new String(msg.getBody(),"utf-8"); String tags = msg.getTags(); System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
启动消费者,生产者,结果如下:
问题:consumeMessage方法的参数是List<MessageExt> msgs,说明消费端是可以一次消费多条消息的,那么其中的一条消息处理失败后,怎么处理呢?
回答:1. 默认情况下消费端一次只消费一条消息,可以通过consumer.setConsumeMessageBatchMaxSize(10),来修改。
2. 如果是先启动消费端,再启动生产端,即使通过上面设置了,消费端也是一次消费一条消息。
3. 如果显示启动生产端,再启动消费端,消费端会一次消费多条消息,例如一次拉取了5条消息,如果在第四条失败了,则这五条消息会全部重试,所以这种情况需要在业务逻辑中去去除重复消息。
问题:消息是怎么重试的?
回答:1. 生成端重试:producer.setRetryTimesWhenSendFailed(3);producer.send(msg,1000); 通过前面两行代码能实现生产端重试,意思是如果消息发送超过了超时时间(1000),重发3次
2.消费端重试:分两种,一是timeout,而是exception
timeout是指MQ没有收到消费端的返回效应(RECONSUME_LATER/CONSUME_SUCCESS),这是MQ会一直给消费端发送消息重试。
exception是指消费端在消费的过程中发生异常,给MQ返回了RECONSUME_LATER,这是MQ会按照一定的规则给消费端发送消息重试。
默认重试规则:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
测试代码示例:
/** * Producer,发送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer"); producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876"); producer.setRetryTimesWhenSendFailed(3); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg,1000); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } /** * Consumer,订阅消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); //默认为1 consumer.setConsumeMessageBatchMaxSize(10); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); try { System.out.println("消息条数:" + msgs.size()); for(MessageExt msg : msgs){ String topic = msg.getTopic(); String msgBody = new String(msg.getBody(),"utf-8"); String tags = msg.getTags(); System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags); if("Hello RocketMQ 4".equals(msgBody)){ System.out.println("=====失败消息开始====="); System.out.println(msg); System.out.println(msgBody); System.out.println("=====失败消息结束====="); int i = 6/0; } } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }