zoukankan      html  css  js  c++  java
  • springboot + RocketMq 练习

    最近项目上在使用rocektmq,特此记录一下

    一、pom依赖

            <!-- rocketmq -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.7.1</version>
            </dependency>

    二、配置

    #rocketmq配置
    rocketmq:
      produceGroupName: unique_group_name 
      consumerGroupName: unique_group_name   
      namesrvAddr: 127.0.0.1:9876
      topic: test-service 
       
      producer:
        maxMessageSize: 4096
        sendMsgTimeout: 3000
        retryTimesWhenSendFailed: 2
      consumer:
        consumeThreadMin: 5
        consumeThreadMax: 32
        consumeMessageBatchMaxSize: 1
      

    三、代码

    1、读取配置

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class RocketMqConfig {
    
        //发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
        public static String produceGroupName;
        //消费者分组
        public static String consumerGroupName;
        // mq的nameserver地址
        public static String namesrvAddr;
    
        // 主题
        public static String topic;
    
        //消息最大长度 默认1024*4(4M)
        public static Integer producerMaxMessageSize;
        //发送消息超时时间,默认3000
        public static Integer producerSendMsgTimeout;
        //发送消息失败重试次数,默认2
        public static Integer producerRetryTimesWhenSendFailed;
    
    
        //消费者线程数量
        public static Integer consumeThreadMin;
        public static Integer consumeThreadMax;
        //设置一次消费消息的条数,默认为1条
        public static Integer consumeMessageBatchMaxSize;
    
        @Value("${rocketmq.produceGroupName}")
        public  void setProduceGroupName(String produceGroupName) {
            RocketMqConfig.produceGroupName = produceGroupName;
        }
    
        @Value("${rocketmq.consumerGroupName}")
        public  void setConsumerGroupName(String consumerGroupName) {
            RocketMqConfig.consumerGroupName = consumerGroupName;
        }
    
        @Value("${rocketmq.namesrvAddr}")
        public void setNamesrvAddr(String namesrvAddr) {
            RocketMqConfig.namesrvAddr = namesrvAddr;
        }
    
        @Value("${rocketmq.topic}")
        public void setTopic(String topic) {
            RocketMqConfig.topic = topic;
        }
    
        @Value("${rocketmq.producer.maxMessageSize}")
        public void setProducerMaxMessageSize(Integer producerMaxMessageSize) {
            RocketMqConfig.producerMaxMessageSize = producerMaxMessageSize;
        }
    
        @Value("${rocketmq.producer.sendMsgTimeout}")
        public void setProducerSendMsgTimeout(Integer producerSendMsgTimeout) {
            RocketMqConfig.producerSendMsgTimeout = producerSendMsgTimeout;
        }
    
        @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
        public void setProducerRetryTimesWhenSendFailed(Integer producerRetryTimesWhenSendFailed) {
            RocketMqConfig.producerRetryTimesWhenSendFailed = producerRetryTimesWhenSendFailed;
        }
    
        @Value("${rocketmq.consumer.consumeThreadMin}")
        public void setConsumeThreadMin(Integer consumeThreadMin) {
            RocketMqConfig.consumeThreadMin = consumeThreadMin;
        }
    
        @Value("${rocketmq.consumer.consumeThreadMax}")
        public void setConsumeThreadMax(Integer consumeThreadMax) {
            RocketMqConfig.consumeThreadMax = consumeThreadMax;
        }
    
        @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
        public void setConsumeMessageBatchMaxSize(Integer consumeMessageBatchMaxSize) {
            RocketMqConfig.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
        }
    }

    2、生产者定义

    import xxx.configuration.RocketMqConfig;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * RocketMQ 生产着
     * @Author YHL
     * @Date 2020/7/23 16:14
     * @Version 1.0
     */
    public class BaseServiceMqProducer {
    
        private static Logger log = LoggerFactory.getLogger(BaseServiceMqProducer.class);
    
        private static DefaultMQProducer producer = new DefaultMQProducer(RocketMqConfig.produceGroupName);
        private static int initialState = 0;
    
        private BaseServiceMqProducer() {
    
        }
    
        public static DefaultMQProducer getDefaultMQProducer(){
            if(producer == null){
                producer = new DefaultMQProducer(RocketMqConfig.produceGroupName);
            }
    
            if(initialState == 0){
                producer.setNamesrvAddr(RocketMqConfig.namesrvAddr);
                //消息最大长度 默认1024*4(4M)
                producer.setMaxMessageSize(RocketMqConfig.producerMaxMessageSize);
                //发送消息超时时间
                producer.setSendMsgTimeout(RocketMqConfig.producerSendMsgTimeout);
                // 如果发送消息失败,设置重试次数,默认为2次
                producer.setRetryTimesWhenSendAsyncFailed(RocketMqConfig.producerRetryTimesWhenSendFailed);
                try {
                    producer.start();
                    log.info("rocketmq-producer 启动成功---------------------------------------");
                } catch (MQClientException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    return null;
                }
    
                initialState = 1;
            }
    
            return producer;
        }
    }

    3、生产者工具类

    import xxx.component.BaseServiceMqProducer;
    import xxx.configuration.RocketMqConfig;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.UnsupportedEncodingException;
    
    
    public class RocketMqProducerUtil {
    
        private static Logger log = LoggerFactory.getLogger(RocketMqProducerUtil.class);
    
        private static String tag = "videoTag";
    
        /**
         * 发送正常消息
         * @param msg
         */
        public static String sendMsg(String msg){
            // 获取消息生产者
            DefaultMQProducer producer = BaseServiceMqProducer.getDefaultMQProducer();
            try {
                Message message = new Message(RocketMqConfig.topic,tag, msg.getBytes("UTF-8"));  // body
                // 日志收集,要求没有那么高,只需要单项发送即可
              SendResult sendResult = producer.send(message);
              SendStatus sendStatus = sendResult.getSendStatus();
              if (sendStatus.equals(SendStatus.SEND_OK)) {
                  log.info("消息发送成功,msg:{}",msg);
                  return "成功";
              } else if (sendStatus.equals(SendStatus.FLUSH_DISK_TIMEOUT)) {
                  log.info("消息发送失败,消息刷盘失败,msg:{}",msg);
                  return "消息发送失败,消息刷盘失败";
              } else if (sendStatus.equals(SendStatus.FLUSH_SLAVE_TIMEOUT)) {
                  log.info("消息发送失败,主从服务器同步超时,msg:{}",msg);
                  return "消息发送失败,主从服务器同步超时";
              } else if (sendStatus.equals(SendStatus.SLAVE_NOT_AVAILABLE)) {
                  log.info("消息发送失败,Broker不可用,msg:{}",msg);
                  return "消息发送失败,Broker不可用";
              } else {
                  log.info("消息发送返回未知状态,msg:{}",msg);
              }
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (RemotingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }
    //        producer.shutdown();
            return "消息发送返回未知状态";
        }
    }

    4、消费者定义

    import xxx.configuration.RocketMqConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    
    
    public class BaseServiceMqConsumer {
    
        private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConfig.consumerGroupName);
        private static int initialState = 0;
    
        private BaseServiceMqConsumer() {
    
        }
    
        public static DefaultMQPushConsumer getDefaultMQPushConsumer(){
            if(consumer == null){
                consumer = new DefaultMQPushConsumer(RocketMqConfig.consumerGroupName);
            }
    
            if(initialState == 0){
                consumer.setNamesrvAddr(RocketMqConfig.namesrvAddr);
                //消费者线程数量
                consumer.setConsumeThreadMin(RocketMqConfig.consumeThreadMin);
                consumer.setConsumeThreadMax(RocketMqConfig.consumeThreadMax);
                //设置一次消费消息的条数,默认为1条
                consumer.setConsumeMessageBatchMaxSize(RocketMqConfig.consumeMessageBatchMaxSize);
                //一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                initialState = 1;
            }
    
            return consumer;
        }
    }

    5、消费者工具类

    import xxx.component.BaseServiceMqConsumer;
    import xxx.configuration.RocketMqConfig;
    import xxx.service.v1.VideoConsumerService;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    
    @Component
    public class RocketMqConsumerUtil {
    
        private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);
    
        @Autowired
        private VideoConsumerService videoConsumerService;
    
        private static String tag = "videoTag";
    
        /**
         * 接收消息
         */
        public void listener(){
    
            // 获取消息生产者
            DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();
    
            // 订阅主体
            try {
                consumer.subscribe(RocketMqConfig.topic, "*");
    
                consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                    /**
                     * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
                     */
                    public ConsumeConcurrentlyStatus consumeMessage(
                            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        MessageExt messageExt = msgs.get(0);
                        String msg = null;
                        try {
                            msg = new String(messageExt.getBody(),"utf-8");
                        } catch (UnsupportedEncodingException e) {
                            log.error("消息编码失败,MsgBody:{}",new String(messageExt.getBody()));
                            e.printStackTrace();
                        }
                        log.info("消费者-bornHost:{},storeHost:{}. groupName:{},topic:{}",messageExt.getBornHost(),messageExt.getStoreHost(),consumer.getConsumerGroup(),RocketMqConfig.topic);
                        log.info("消费开始-MsgBody:{}",msg);
    //                    String msg = new String(messageExt.getBody());
    //                    log.info("MsgBody:{}",new String(messageExt.getBody()));
    
                        if (messageExt.getTopic().equals(RocketMqConfig.topic)) {
                            // topic的消费逻辑
                            if (messageExt.getTags() != null && messageExt.getTags().equals(tag)) {
                                // 根据Tag消费消息,具体消费消息的业务方法
                                videoConsumerService.dealVideoMsg(msg);
    
                            }
                        } else if (messageExt.getTopic().equals("TopicTest2")) {
                            // 执行TopicTest2的消费逻辑
                        }
    
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
    
                /**
                 * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
                 */
                consumer.start();
                log.info("rocketmq-consumer 启动成功---------------------------------------");
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    四、调用说明

    1、生产者工具类可以直接调用

     
        @RequestMapping(value = "/test", method = RequestMethod.POST)
        public void tes(@RequestParam("msg") String msg) {
            RocketMqProducerUtil.sendMsg(msg);
    
        }     
      

    2、消费者工具类需要在项目启动时候初始化一下才可以调用,可以在主启动类上直接调用

        public static void main(String[] args) {
            SpringApplication.run(Jnwsn4residentapiApplication.class, args);
            RocketMqConsumerUtil rocketMqConsumerUtil = new RocketMqConsumerUtil();
            rocketMqConsumerUtil.listener();
        }
    
        

    也可以自定义初始化,需要实现CommandLineRunner接口

    import xxx.util.RocketMqConsumerUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TestRunner implements CommandLineRunner {
    
        @Autowired
        private RocketMqConsumerUtil rocketMqConsumerUtil;
    
        @Override
        public void run(String... args) throws Exception {
            rocketMqConsumerUtil.listener();
        }
    }

    最后,就可以在消费者工具类中注入自己需要的业务处理Service了 ,把VideoConsumerServie注入替换掉就好

  • 相关阅读:
    SAS学习 day10
    SAS学习 day9
    SAS学习 day8
    Python解释器 发展史
    os. 模块
    字典
    类型1
    计算机编码
    EDA 会议整理
    2020-8-27
  • 原文地址:https://www.cnblogs.com/xuchao0506/p/15219658.html
Copyright © 2011-2022 走看看