1、pom文件
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starters</artifactId> <version>2.1.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies>
2、yml文件
server: port: 9001 spring: application: name: ware01-rocket-queue rocketmq: # 生产者配置 producer: isOnOff: on # 发送同一类消息的设置为同一个group,保证唯一 groupName: rocketGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 消息最大长度 默认1024*4(4M) maxMessageSize: 4096 # 发送消息超时时间,默认3000 sendMsgTimeout: 3000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 消费者配置 consumer: isOnOff: on # 官方建议:确保同一组中的每个消费者订阅相同的主题。 groupName: rocketGroup # 服务地址 namesrvAddr: 127.0.0.1:9876 # 接收该 Topic 下所有 Tag topics: rocketTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 设置一次消费消息的条数,默认为1条 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag rocket: group: rocketGroup topic: rocketTopic tag: rocketTag
3、rocketMQ配置
3.1 RocketMQ 生产者配置
package com.boot.rocket.queue.rocket; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by Administrator on 2020/6/19 0019. * RocketMQ 生产者配置 */ @Configuration public class ProducerConfig { private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer(){ DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(namesrvAddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName // 消息最大长度 默认1024*4(4M) if(null != maxMessageSize){ producer.setMaxMessageSize(maxMessageSize); } // 发送消息超时时间,默认3000 if(null != sendMsgTimeout){ producer.setSendMsgTimeout(sendMsgTimeout); } //如果发送消息失败,设置重试次数,默认为2次 if(null != retryTimesWhenSendFailed){ producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
3.2 RocketMQ 消费者者配置
package com.boot.rocket.queue.rocket; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * Created by Administrator on 2020/6/19 0019. * RocketMQ 消费者者配置 */ @Configuration public class ConsumerConfig { private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class) ; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Resource private RocketMsgListener msgListener; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(msgListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } }
3.3 RocketMQ 消费者监听器配置
package com.boot.rocket.queue.rocket; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; /** * Created by Administrator on 2020/6/19 0019. * RocketMQ 消费者监听器配置 */ @Component public class RocketMsgListener implements MessageListenerConcurrently{ private static final Logger logger = LoggerFactory.getLogger(RocketMsgListener.class) ; @Value("${rocket.topic}") public String rocketTopic ; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); logger.info("接受到的消息为:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已经重试了3次,如果不需要再次消费,则返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(rocketTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "rocketTag": logger.info("开户 tag == >>"+tags); break ; default: logger.info("未匹配到Tag == >>"+tags); break; } } // 消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
4、controll 层
package com.boot.rocket.queue.controller; import com.boot.rocket.queue.service.RocketMqService; import org.apache.rocketmq.client.producer.SendResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * Created by Administrator on 2020/6/19 0019. */ @RestController @RequestMapping("/mq") public class RocketController { @Autowired private RocketMqService rocketMqService; @RequestMapping("/sendMsg") public SendResult sendMsg (){ String msg = "OpenAccount Msg"; SendResult sendResult = null; try { sendResult = rocketMqService.sendMsg(msg) ; } catch (Exception e) { e.printStackTrace(); } return sendResult ; } } 5、service 层 package com.boot.rocket.queue.service; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * Created by Administrator on 2020/6/19 0019. */ @Service public class RocketMqService { @Value("${rocket.group}") public String rocketGroup ; @Value("${rocket.topic}") public String rocketTopic ; @Value("${rocket.tag}") public String rocketTag ; @Resource private DefaultMQProducer defaultMQProducer; public SendResult sendMsg(String msgInfo) { // 可以不使用Config中的Group defaultMQProducer.setProducerGroup(rocketGroup); SendResult sendResult = null; try { Message sendMsg = new Message(rocketTopic, rocketTag, "open_account_key", msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } return sendResult ; } }