幂等指的就是执行多次和执行一次的效果相同,主要是为了防止数据重复消费。MQ中为了保证消息的可靠性,生产者发送消息失败(例如网络超时)会触发 "重试机制",它不是生产者重试而是MQ自动触发的重试机制, 而这种情况下消费者就会收到两条消息,比如明明只需要扣一次款, 可是消费者却执行了2次。为了解决幂等问题,每一个消息应该有一个全局的唯一的标识,当处理过这条消息后,就把这个标识保存到数据库或者redis中,在处理消息前前判断这个标识记录为空就好了。像activemq中msgId就是唯一的,我们可以直接拿这个id来判断,但是rocketmq重试机制不一样,它重发会产生一个新的id,但是它提供了setKeys()这个api,我们可以给key设置一个唯一的流水编号来加以判断。(重试机制是不存在并发问题的,它是间隔一段时间自动促发的)。
1. 导入依赖( 生产者和消费者的依赖都一样)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> <relativePath/> </parent> <!-- springcloud <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- webmvc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 集成lombok 框架(get/set) --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- RocketMq --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <!-- 热加载 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!-- jackson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.30</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
2. 生产者配置参数和配置文件
#该应用是否启用生产者
#rocketmq.producer.isOnOff=on
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.producer.groupName=mqtest
#mq的nameserver地址
rocketmq.producer.namesrvAddr=192.168.5.7:9876
#消息最大长度 默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=3
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; /** * 生产者配置 */ @PropertySource("classpath:rocketmq.properties") @Configuration public class MQProducerConfiguration { public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class); /** * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 */ @Value("${rocketmq.producer.groupName}") private String groupName; /** 服务器地址 */ @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; /** * 消息最大大小,默认4M */ @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; /** * 消息发送超时时间,默认3秒 */ @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; /** * 消息发送失败重试次数,默认2次 */ @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName //producer.setInstanceName(instanceName); producer.setMaxMessageSize(this.maxMessageSize); producer.setSendMsgTimeout(this.sendMsgTimeout); //如果发送消息失败,设置重试次数,默认为2次 producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); try { producer.start(); LOGGER.info(String.format("rocketmq producer start ")); } catch (MQClientException e) { LOGGER.error(String.format("producer is error {}", e.getMessage(),e)); } return producer; } }
3. 生产者发送消息
import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import lombok.extern.slf4j.Slf4j; @RestController @Slf4j public class TestController { /**使用RocketMq的生产者*/ @Autowired private DefaultMQProducer defaultMQProducer; @RequestMapping("/send") public void send(){ String msg = "幂等"; log.info("开始发送消息:"+msg); try { // arg0主题名称 arg1分组 arg2内容 Message sendMsg = new Message("DemoTopic","wulei",(msg).getBytes()); // 注意: activemq的msgId是唯一的,但是rocketmq的不是,所以幂等不能用id来判断,我们可以通过setKeys来解决,一般都是业务id,这里用随机数代替。 sendMsg.setKeys(UUID.randomUUID().toString()); SendResult sendResult = defaultMQProducer.send(sendMsg); //默认3秒超时 log.info("消息发送响应信息:"+sendResult.toString()); } catch (Exception e) { e.printStackTrace(); } } }
4. 消费者配置参数和配置文件
##该应用是否启用消费者 #rocketmq.consumer.isOnOff=on #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 rocketmq.consumer.groupName=mqtest #mq的nameserver地址 rocketmq.consumer.namesrvAddr=192.168.5.7:9876 #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*; rocketmq.consumer.topics=DemoTopic~*; rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64 #设置一次消费消息的条数,默认为1条 rocketmq.consumer.consumeMessageBatchMaxSize=1
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.wulei.listener.MQConsumeMsgListenerProcessor; /** * 消费者配置 */ @PropertySource("classpath:rocketmq.properties") @Configuration public class MQConsumerConfiguration { public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); // 地址 @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; // 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 @Value("${rocketmq.consumer.groupName}") private String groupName; // 该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags) @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Autowired private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(mqMessageListenerProcessor); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 设置消费模型,集群还是广播,默认为集群 */ //consumer.setMessageModel(MessageModel.CLUSTERING); /** * 设置一次消费消息的条数,默认为1条 */ consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { /** * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3 */ String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); }catch (MQClientException e){ LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); } return consumer; } }
5. 消费者监听消息
import java.util.HashMap; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{ private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); // 假装这是一个redis private HashMap<String, String> myredis = new HashMap<String, String>(); /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/> * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // if(CollectionUtils.isEmpty(msgs)){ // logger.info("接受到的消息为空,不处理,直接返回成功"); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } // //for(MessageExt messageExt : msgs) { } MessageExt messageExt = msgs.get(0); String keys = messageExt.getKeys();// 自定义的唯一key String msgId = null; // 消息id(不是唯一的) String msgContext = null; // 消息内容 int reconsume = 0; // 重试次数 if(messageExt.getTopic().equals("DemoTopic") && messageExt.getTags().equals("wulei")){ if(myredis.get(keys)==null) { //logger.info("接受到的消息为:"+messageExt.toString()); msgId = messageExt.getMsgId(); msgContext = new String(messageExt.getBody()); reconsume = messageExt.getReconsumeTimes(); try { int i = 1/0; System.out.println("消费成功: id:"+msgId+" msg"+msgContext+" 次数"+reconsume); myredis.put(messageExt.getKeys(), msgContext); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 重试3次就不在重试了,直接返回消费成功状态,并触发人工补偿机制。 if(reconsume==2) { myredis.put(messageExt.getKeys(), msgContext); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else { // 一般消费者这边尽量不要抛异常,它失败就会触发重试机制。如果非要抛异常可以在try{}catch{}里面return ConsumeConcurrentlyStatus.RECONSUME_LATER(表示失败让他重试) return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }else { // 已经消费过就不要再重试了,直接返回成功。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }else { // 不存在不要再重试了,直接返回成功。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } }