zoukankan      html  css  js  c++  java
  • rabbitmq队列延迟

    1. 场景:“订单下单成功后,15分钟未支付自动取消”


    1.传统处理超时订单

    采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
    并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
    即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
    然后再做其他的业务操作

     

     

    2.rabbitMQ延时队列方案


    一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
    并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失


    2. TTL和DLX


    rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

    1.TTL


    TTL是Time To Live的缩写, 也就是生存时间。
    RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
    如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
    默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消息,否则丢弃。

    设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
    设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0

    2.DLX和死信队列


    DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

    死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
    这个队列,就是死信队列

    成为死信一般有以下几种情况:
    消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
    消息的TTL-存活时间已经过期
    队列长度限制被超越(队列满)


    注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
    注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
    x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键

    3. 延迟队列


    通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费


    注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

    创建主模块,及子模块

    2.主模块
    <!-- 1.packaging模式改为pom -->
    <packaging>pom</packaging>

    <!-- 2.添加子模块 -->
    <modules>
    <module>rabbitmq-provider</module>
    <module>rabbitmq-consumer</module>
    <module>common-vo</module>
    </modules>

    在主模块的POM的<dependencies>中添加公共子模块common

    1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

    关键代码如下

    package com.hmc.rabbitmqprovider.rabbitmq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.xml.ws.WebEndpoint;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-23 23:39
     */
    @Configuration
    public class RabbitQueueConfig {
    
        //定义队列,交换机,路由键(正常)
        public  static  final  String NORMAL_QUEUE="normar-queue";
        public  static  final  String NORMAL_EXCHANGE="normar-exchange";
        public  static  final  String NORMAL_ROUTINGKEY="normar-routingkey";
    
    
    
        //定义队列,交换机,路由键(死信)
        public  static  final  String DELAY_QUEUE="delay-queue";
        public  static  final  String DELAY_EXCHANGE="delay-exchange";
        public  static  final  String DELAY_ROUTINGKEY="delay-routingkey";
    
    
         @Bean
        public Queue norma1Queue(){
             Map<String,Object> map=new HashMap<>();
             map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为10秒
             map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
             map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
            return  new Queue(NORMAL_QUEUE,true,false,false,map);
        }
        //直连交换机
        @Bean
        public DirectExchange normalExchange(){
             return  new DirectExchange(NORMAL_EXCHANGE,true,false);
        }
    
        @Bean
        public Binding normalBinding(){
             return BindingBuilder.bind(norma1Queue()).to(normalExchange())
                     .with(NORMAL_ROUTINGKEY);
        }
    
    //死信
    
       @Bean
       public  Queue delaQueue(){
             return new Queue(DELAY_QUEUE,true);
       }
    
       @Bean
       public  DirectExchange  delayExchange(){
             return  new DirectExchange(DELAY_EXCHANGE,true,false);
    
       }
    
         @Bean
        public  Binding delayBinding(){
          return  BindingBuilder.bind(delaQueue()).to(delayExchange())
               .with(DELAY_ROUTINGKEY);
       }
    }

     controller层

    package com.hmc.rabbitmqprovider.controller;
    
    import com.hmc.commonvo.vo.OrderVo;
    import com.hmc.rabbitmqprovider.rabbitmq.RabbitQueueConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 00:02
     */
    @RestController
    @Slf4j
    public class SendController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/senderBymodel")
        public  Map<String,Object> senderBymodel(String orderNo){
            OrderVo orderVo=new OrderVo();
    
            orderVo.setOrderId(10000001L);
            orderVo.setOrderNo(orderNo);
            orderVo.setCreatedate(new Date());
            //推送消息
            rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
            log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }
    
    
         @RequestMapping("/sender")
        public  Map<String,Object> sender(){
            Map<String,Object> data=this.createData();
    
            //推送消息
             rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                     RabbitQueueConfig.NORMAL_ROUTINGKEY,data);
             log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                     RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }
    
        private Map<String,Object> createData(){
            Map<String,Object> data=new HashMap<>();
            String  createData= LocalDateTime.now().
                    format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            String message="hello rabbitmq!!!";
            data.put("createData",createData);
            data.put("message",message);
            return data;
        }
    
    
    }

    yml文件配置

    #服务的端口和项目名
    server:
      port: 8081
      servlet:
        context-path: /rabbitmq-provider
    
    ## rabbitmq config
    spring:
        rabbitmq:
          host: 192.168.197.134
          port: 5672
          username: springcloud
          password: 123456
    
         ## 与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~
          virtual-host: my_vhost

    pom配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.hmc</groupId>
            <artifactId>rabbitmq03</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>rabbitmq-provider</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-provider</name>
        <packaging>jar</packaging>
        <description>Demo project for Spring Boot</description>
    
    
    </project>

      

    开启虚拟机测试生产者

     

    这里可以看出消息10秒未消费的情况下,会推送到死信队列,测试是成功的。

    接下来我们用消费者进行消费

    RabbitDelayReceiver
    
    
    
    
    

     3条消息已经成功消费

    json转换

      生产者代码

    package com.hmc.rabbitmqprovider.controller;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 00:57
     */
    
    /**
     * json转换
     1.生产者
     */
    @Configuration
    public class RabbitTemplateConfig {
    
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
               RabbitTemplate rabbitTemplate=new RabbitTemplate();
               rabbitTemplate.setConnectionFactory(connectionFactory);
               rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
               return  rabbitTemplate;
        }
    
        @Bean
        public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
         return new Jackson2JsonMessageConverter();
        }
    }

    消费者代码

    package com.hmc.rabbitmqconsumer.rabbitmq;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 01:16
     */
    
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * json转换
     * 消费者
     */
    @Configuration
    public class RabbitListenerReceiverConfig {
    
        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(jackson2JsonMessageConverter());
            return factory;
        }
    
        @Bean
        public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
    
    }

    yml文件配置

    #服务的端口和项目名
    server:
      port: 8082
      servlet:
        context-path: /rabbitmq-consumer
    
    ## rabbitmq config
    spring:
        rabbitmq:
          host: 192.168.197.134
          port: 5672
          username: springcloud
          password: 123456
    
         ## 与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~
          virtual-host: my_vhost

    pom配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.hmc</groupId>
            <artifactId>rabbitmq03</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>rabbitmq-cousumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-cousumer</name>
        <packaging>jar</packaging>
        <description>Demo project for Spring Boot</description>
    
    
    </project>

    controller

    RequestMapping("/senderBymodel")
        public  Map<String,Object> senderBymodel(String orderNo){
            OrderVo orderVo=new OrderVo();
    
            orderVo.setOrderId(10000001L);
            orderVo.setOrderNo(orderNo);
            orderVo.setCreatedate(new Date());
            //推送消息
            rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
            log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }

    测试

    消息格式

    消费者接收消息格式

  • 相关阅读:
    [转]Python跳过第一行读取文件内容
    Batch Apex之Database.Stateful
    Package.xml文件可取得Metadata
    SFDC 关于Custom Object在Lightingアプリケーションビルダー中可以配置「Chatter」和「活動」
    SFDC 在Batch Apex中使用Aggregate SOQL统计查询语句及结果
    SFDC 为什么Label有时候在同一行,有时候换行了呢?
    SFDC String.isEmpty vs. String.isBlank
    SFDC CustomLabels vs. CustomMetadata
    SFDC Custom Object里无法设置Search Layout的解决方法
    租房注意点
  • 原文地址:https://www.cnblogs.com/xmf3628/p/12097101.html
Copyright © 2011-2022 走看看