什么是幂等性:
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就实现可消息幂等。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。
如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,
且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
实例:
在RocketMQ中因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。
最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。
下面模拟:
生产者
package com.wish.retry; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class RetryProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("retry_rmq-group"); producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876"); producer.setInstanceName("retry_producer"); producer.start(); try { for (int i = 0; i < 1; i++) { Thread.sleep(1000); // 每秒发送一次MQ Message msg = new Message("itmayiedu-topic", // topic 主题名称 "TagA", // tag 临时值 ("retry_itmayiedu-6" + i).getBytes()// body 内容 ); msg.setKeys(System.currentTimeMillis() + ""); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
消费者
package com.wish.retry; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.HashMap; import java.util.List; import java.util.Map; public class RetryConsumer { static private Map<String, String> logMap = new HashMap<>(); public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry_rmq-group"); consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876"); consumer.setInstanceName("retry_consumer"); consumer.subscribe("retry_itmayiedu-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { String key = null; String msgId = null; try { for (MessageExt msg : msgs) { key = msg.getKeys(); if (logMap.containsKey(key)) { // 无需继续重试。 System.out.println("key:"+key+",无需重试..."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } msgId = msg.getMsgId(); System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody())); int i = 1 / 0; } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { logMap.put(key, msgId); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("RetryConsumer Started."); } }