zoukankan      html  css  js  c++  java
  • Sping Boot入门到实战之实战篇(一):实现自定义Spring Boot Starter——阿里云消息队列服务Starter

      在 Sping Boot入门到实战之入门篇(四):Spring Boot自动化配置 这篇中,我们知道Spring Boot自动化配置的实现,主要由如下几部分完成:

    1. @EnableAutoConfiguration注解
    2. SpringApplication类
    3. spring-boot-autoconfigure jar包
    4. spring.factories文件

      官方提供的starter,大多包含两个jar包: 一个starter——没有任何实现,只用来管理依赖(spring.providers文件声明),一个autoconfigure包——包含所有具体实现,包括自动配置类,及META-INF/spring.factories文件。自定义starter的时候,可以合并写到一个。

      官方提供的starter,命名遵循spring-boot-starter-xx, 自定义starter,命名遵循xx-spring-boot-starter。

      本文基于阿里云消息队列RocketMQ服务(https://help.aliyun.com/document_detail/43349.html?spm=a2c4g.11186623.3.2.Ui5KeU),实现一个自定义starter,以实现定时消息与延迟消息(如订单多久未支付自动关闭等)发送与接收功能的快速开发。源码地址: mq-spring-boot-starter

      

      1 创建mq-spring-boot-starter maven项目。pom.xml中引入依赖:

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-autoconfigure</artifactId>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.7.0.Final</version>
            </dependency>
        </dependencies>

      2 定义属性配置类。读取属性配置文件中以"aliyun.mq"开头的属性。

    @ConfigurationProperties(prefix = "aliyun.mq")
    public class MqPropertiesConfig {
        private String onsAddr;
        private String topic;
        private String accessKey;
        private String secretKey;
        private Properties producer;
        private Properties consumer;
        private String tagSuffix;
    
        setter/getter;
    }

      3 定义自动配置类。其中 @ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true") 表示当配置的属性中,存在属性aliyun.mq.consumer.enabled,且值为true时,才实例化该Bean。因为某些应用,只需要生产者或消费者,可以通过这个属性来控制是否实例化对应Bean。

    @Configuration
    @EnableConfigurationProperties(MqPropertiesConfig.class)
    public class MqAutoConfig {
    
        @Autowired
        private MqPropertiesConfig propConfig;
    
        @Bean(initMethod="start", destroyMethod = "shutdown")
        @ConditionalOnMissingBean
        @ConditionalOnProperty(prefix = "aliyun.mq.producer",value = "enabled",havingValue = "true")
        public MqTimerProducer mqTimerProducer(){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.ProducerId, propConfig.getProducer().getProperty("producerId"));
            properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
            properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
            properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
            properties.setProperty("topic", propConfig.getTopic());
            return  new MqTimerProducer(properties);
        }
    
        @Bean(initMethod="start", destroyMethod = "shutdown")
        @ConditionalOnMissingBean
        @ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true")
        public MqConsumer mqConsumer(){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.ConsumerId, propConfig.getConsumer().getProperty("consumerId"));
            properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
            properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
            properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
            properties.setProperty("topic", propConfig.getTopic());
            return  new MqConsumer(properties);
        }
    }

      

      4. 定义生产者。send方法采用同步的方式将消息内容body,经过一定延迟delay后,发送到指定消息队列topic,且标签为tag(消费者可以根据tag过滤同一个topic的消息)。sendAsync则通过异步的方式发送消息,消息发送完成后,通过指定的回调SendCallback自定义处理。这里默认回调只是以日志进行记录。

    public class MqTimerProducer {
        private final static Logger LOG = LoggerFactory.getLogger(MqTimerProducer.class);
        private Properties properties;
        private Producer producer;
        private String topic;
    
        public MqTimerProducer(Properties properties) {
            if (properties == null || properties.get(PropertyKeyConst.ProducerId) == null
                    || properties.get(PropertyKeyConst.AccessKey) == null
                    || properties.get(PropertyKeyConst.SecretKey) == null
                    || properties.get(PropertyKeyConst.ONSAddr) == null
                    || properties.get("topic") == null) {
                throw new ONSClientException("producer properties not set properly.");
            }
            this.properties = properties;
            this.topic = properties.getProperty("topic");
        }
    
        public void start() {
            this.producer = ONSFactory.createProducer(this.properties);
            this.producer.start();
        }
    
        public void shutdown() {
            if (this.producer != null) {
                this.producer.shutdown();
            }
        }
    
        public void send(String tag, String body, long delay) {
            LOG.info("start to send message. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
            if (topic == null || tag == null || body == null) {
                throw new RuntimeException("topic, tag, or body is null.");
            }
            Message message = new Message(topic, tag, body.getBytes());
            message.setStartDeliverTime(System.currentTimeMillis() + delay);
            SendResult result = this.producer.send(message);
            LOG.info("send message success. ", result.toString());
        }
    
    
        public void sendAsync(String tag, String body, long delay) {
            this.sendAsync(tag, body, delay, new DefaultSendCallback());
        }
    
        public void sendAsync(String tag, String body, long delay, SendCallback sendCallback) {
            LOG.info("start to send message async. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
            if (topic == null || tag == null || body == null) {
                throw new RuntimeException("topic, tag, or body is null.");
            }
            Message message = new Message(topic, tag, body.getBytes());
            message.setStartDeliverTime(System.currentTimeMillis() + delay);
            this.producer.sendAsync(message, sendCallback);
        }
    
        setter/getter;
    }

      5 定义消费者。subscribe方法对指定topic的某些标签tags进行消息订阅,当有该topic下带有这些tags(满足其中一个即可)的消息到达时,交由messageListener处理。这里定义了一个抽象类AbstractMessageListener,通过模板方法将消息的处理逻辑统一(正常消费,commit;出现异常,重新消费),消费者只需要继承AbstractMessageListener,实现handle方法完成消息消费即可。

    public class MqConsumer {
    
        private final static Logger LOG = LoggerFactory.getLogger(MqConsumer.class);
        private Properties properties;
        private Consumer consumer;
        private String topic;
    
        public MqConsumer(Properties properties) {
            if (properties == null || properties.get(PropertyKeyConst.ConsumerId) == null
                    || properties.get(PropertyKeyConst.AccessKey) == null
                    || properties.get(PropertyKeyConst.SecretKey) == null
                    || properties.get(PropertyKeyConst.ONSAddr) == null
                    || properties.get("topic") == null) {
                throw new ONSClientException("consumer properties not set properly.");
            }
            this.properties = properties;
            this.topic = properties.getProperty("topic");
        }
    
        public void start() {
            this.consumer = ONSFactory.createConsumer(properties);
            this.consumer.start();
        }
    
        public void shutdown() {
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
        }
    
        /**
         * @param tags            多个tag用'||'拼接,所有用*
         * @param messageListener
         */
        public void subscribe(String tags, AbstractMessageListener messageListener) {
            LOG.info("subscribe [topic: {}, tags: {}, messageListener: {}]", topic, tags, messageListener.getClass().getCanonicalName());
            consumer.subscribe(topic, tags, messageListener);
        }
    }
    public abstract class AbstractMessageListener implements MessageListener {
    
        private final static Logger LOG = LoggerFactory.getLogger(AbstractMessageListener.class);
    
        public abstract void handle(String body);
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            LOG.info("receive message. [topic: {}, tag: {}, body: {}, msgId: {}, startDeliverTime: {}]", message.getTopic(), message.getTag(), new String(message.getBody()), message.getMsgID(), message.getStartDeliverTime());
            try {
                handle(new String(message.getBody()));
                LOG.info("handle message success.");
                return Action.CommitMessage;
            } catch (Exception e) {
                //消费失败
                LOG.warn("handle message fail, requeue it.", e);
                return Action.ReconsumeLater;
            }
        }
    }

      6 前面已完成了所有消息队列服务相关功能的代码实现,要使引用项目自动进行配置,还需定义META-INF/spring.factories文件,将自动配置类赋值给org.springframework.boot.autoconfigure.EnableAutoConfiguration。

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.ieyecloud.springboot.mq.config.MqAutoConfig

      7 使用

      7.1 pom.xml引入依赖,当前version:1.0.0-SNAPSHOT

    <dependency>
         <groupId>com.ieyecloud</groupId>
         <artifactId>mq-spring-boot-starter</artifactId>
         <version>1.0-SNAPSHOT</version>
     </dependency>

      7.2 application配置文件中添加相应配置

    aliyun:
         mq:
             onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
             topic: mulin_topic_test
             accessKey: xxx
             secretKey: xxx
             producer:
                 enabled: true  #为false表示不引入producer,为true则producerId必须提供
                 producerId: xxx
             consumer:
                 enabled: true  #为false表示不引入consumer,为true则consumerId必须提供
             consumerId: xxx

      7.3 使用producer,consumer只需要在相应类中依需要注入对应实例

    @Autowired
     private MqTimerProducer producer;
    
     @Autowired
     private MqConsumer consumer;

      7.4 consumer监听处理类实现,继承AbstractMessageListener类,实现handle方法即可,如

     @Component
     public class QuestionStatusMessageListener extends AbstractMessageListener{
    
         @Autowired
         private QuickQuestionService questionService;
    
         @Override
         public void handle(String s) {
             QuestionStatusMessage message = JsonUtil.fromJson(s, QuestionStatusMessage.class);
             questionService.updateStatus(message.getQid(), message.getCs(), message.getTs());
         }
     }
  • 相关阅读:
    Oracle OMF 创建数据库
    Oracle 数据库修改数据文件
    oracle 12c 安装文档
    mysql5.7 多次增量备份及恢复测试
    Mysql 数据库模拟误操作基于binlog恢复
    基于scn号恢复oracle数据库测试
    读源码 | metisMenu侧边栏插件
    JavaScript | JQuery插件定义方法
    JavaScript | 模拟文件拖选框样式 v1.0
    JavaScript | JSON基本格式
  • 原文地址:https://www.cnblogs.com/spec-dog/p/8652551.html
Copyright © 2011-2022 走看看