zoukankan      html  css  js  c++  java
  • springboot 整合 RocketMQ

    1、pom文件

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starters</artifactId>
            <version>2.1.3.RELEASE</version>
        </parent>
    <dependencies>
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-web</artifactId>
    	</dependency>
    	<dependency>
    		<groupId>org.apache.rocketmq</groupId>
    		<artifactId>rocketmq-client</artifactId>
    		<version>4.4.0</version>
    	</dependency>
    </dependencies>
    

    2、yml文件

    server:
      port: 9001
    
    spring:
      application:
        name: ware01-rocket-queue
    
    
    
    rocketmq:
      # 生产者配置
      producer:
        isOnOff: on
        # 发送同一类消息的设置为同一个group,保证唯一
        groupName: rocketGroup
        # 服务地址
        namesrvAddr: 127.0.0.1:9876
        # 消息最大长度 默认1024*4(4M)
        maxMessageSize: 4096
        # 发送消息超时时间,默认3000
        sendMsgTimeout: 3000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
      # 消费者配置
      consumer:
        isOnOff: on
        # 官方建议:确保同一组中的每个消费者订阅相同的主题。
        groupName: rocketGroup
        # 服务地址
        namesrvAddr: 127.0.0.1:9876
        # 接收该 Topic 下所有 Tag
        topics: rocketTopic~*;
        consumeThreadMin: 20
        consumeThreadMax: 64
        # 设置一次消费消息的条数,默认为1条
        consumeMessageBatchMaxSize: 1
    
    # 配置 Group  Topic  Tag
    rocket:
      group: rocketGroup
      topic: rocketTopic
      tag: rocketTag
    

    3、rocketMQ配置

    3.1 RocketMQ 生产者配置

    package com.boot.rocket.queue.rocket;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Created by Administrator on 2020/6/19 0019.
     * RocketMQ 生产者配置
     */
    @Configuration
    public class ProducerConfig {
        private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class) ;
    
        @Value("${rocketmq.producer.groupName}")
        private String groupName;
        @Value("${rocketmq.producer.namesrvAddr}")
        private String namesrvAddr;
        @Value("${rocketmq.producer.maxMessageSize}")
        private Integer maxMessageSize ;
        @Value("${rocketmq.producer.sendMsgTimeout}")
        private Integer sendMsgTimeout;
        @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
        private Integer retryTimesWhenSendFailed;
    
        @Bean
        public DefaultMQProducer getRocketMQProducer(){
            DefaultMQProducer producer = new DefaultMQProducer(groupName);
            producer.setNamesrvAddr(namesrvAddr);
    
            //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
    
            // 消息最大长度 默认1024*4(4M)
            if(null != maxMessageSize){
                producer.setMaxMessageSize(maxMessageSize);
            }
            // 发送消息超时时间,默认3000
            if(null != sendMsgTimeout){
                producer.setSendMsgTimeout(sendMsgTimeout);
            }
            //如果发送消息失败,设置重试次数,默认为2次
            if(null != retryTimesWhenSendFailed){
                producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
            }
    
            try {
                producer.start();
            } catch (MQClientException e) {
               e.printStackTrace();
            }
            return producer;
        }
    
    }
    

    3.2 RocketMQ 消费者者配置

    package com.boot.rocket.queue.rocket;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    /**
     * Created by Administrator on 2020/6/19 0019.
     * RocketMQ 消费者者配置
     */
    @Configuration
    public class ConsumerConfig {
        private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class) ;
    
    
        @Value("${rocketmq.consumer.namesrvAddr}")
        private String namesrvAddr;
        @Value("${rocketmq.consumer.groupName}")
        private String groupName;
        @Value("${rocketmq.consumer.consumeThreadMin}")
        private int consumeThreadMin;
        @Value("${rocketmq.consumer.consumeThreadMax}")
        private int consumeThreadMax;
        @Value("${rocketmq.consumer.topics}")
        private String topics;
        @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
        private int consumeMessageBatchMaxSize;
    
        @Resource
        private RocketMsgListener msgListener;
        @Bean
        public DefaultMQPushConsumer getRocketMQConsumer(){
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeThreadMin(consumeThreadMin);
            consumer.setConsumeThreadMax(consumeThreadMax);
            consumer.registerMessageListener(msgListener);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
            try {
                String[] topicTagsArr = topics.split(";");
                for (String topicTags : topicTagsArr) {
                    String[] topicTag = topicTags.split("~");
                    consumer.subscribe(topicTag[0],topicTag[1]);
                }
                consumer.start();
            }catch (MQClientException e){
                e.printStackTrace();
            }
            return consumer;
        }
    
    }
    

    3.3 RocketMQ 消费者监听器配置

    package com.boot.rocket.queue.rocket;
    
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    import java.util.List;
    
    /**
     * Created by Administrator on 2020/6/19 0019.
     * RocketMQ 消费者监听器配置
     */
    @Component
    public class RocketMsgListener implements MessageListenerConcurrently{
        private static final Logger logger = LoggerFactory.getLogger(RocketMsgListener.class) ;
    
        @Value("${rocket.topic}")
        public String rocketTopic ;
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (CollectionUtils.isEmpty(list)){
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            MessageExt messageExt = list.get(0);
            logger.info("接受到的消息为:"+new String(messageExt.getBody()));
            int reConsume = messageExt.getReconsumeTimes();
            // 消息已经重试了3次,如果不需要再次消费,则返回成功
            if(reConsume ==3){
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    
            if(messageExt.getTopic().equals(rocketTopic)){
                String tags = messageExt.getTags() ;
                switch (tags){
                    case "rocketTag":
                        logger.info("开户 tag == >>"+tags);
                        break ;
                    default:
                        logger.info("未匹配到Tag == >>"+tags);
                        break;
                }
            }
            // 消息消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    

    4、controll 层

    package com.boot.rocket.queue.controller;
    
    import com.boot.rocket.queue.service.RocketMqService;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * Created by Administrator on 2020/6/19 0019.
     */
    @RestController
    @RequestMapping("/mq")
    public class RocketController {
        @Autowired
        private RocketMqService rocketMqService;
    
        @RequestMapping("/sendMsg")
        public SendResult sendMsg (){
            String msg = "OpenAccount Msg";
            SendResult sendResult = null;
            try {
                sendResult = rocketMqService.sendMsg(msg) ;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return sendResult ;
        }
    }
    5、service 层
    
    package com.boot.rocket.queue.service;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    
    /**
     * Created by Administrator on 2020/6/19 0019.
     */
    @Service
    public class RocketMqService {
    
        @Value("${rocket.group}")
        public String rocketGroup ;
        @Value("${rocket.topic}")
        public String rocketTopic ;
        @Value("${rocket.tag}")
        public String rocketTag ;
    
        @Resource
        private DefaultMQProducer defaultMQProducer;
    
        public SendResult sendMsg(String msgInfo) {
            // 可以不使用Config中的Group
            defaultMQProducer.setProducerGroup(rocketGroup);
            SendResult sendResult = null;
            try {
                Message sendMsg = new Message(rocketTopic,
                        rocketTag,
                        "open_account_key", msgInfo.getBytes());
                sendResult = defaultMQProducer.send(sendMsg);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return sendResult ;
        }
    }
    

    github地址:https://github.com/shibaobei/middle-ware-parent

  • 相关阅读:
    sqlserver内存释放
    Windows任务管理器中内存使用、虚拟内存区别及与页面文件的关系
    GetMessage
    String.Format(string, arg0)中sring格式
    C#基础--之数据类型
    C# Socket
    C# 对象 序列化 XML
    C# Monitoring-network
    Nginx 网址
    WinSCP 连接 Ubuntu 拒绝的问题
  • 原文地址:https://www.cnblogs.com/bao-bei/p/13162975.html
Copyright © 2011-2022 走看看