zoukankan      html  css  js  c++  java
  • rabbitmq队列延迟

    1. 场景:“订单下单成功后,15分钟未支付自动取消”


    1.传统处理超时订单

    采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
    并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
    即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
    然后再做其他的业务操作

     

     

    2.rabbitMQ延时队列方案


    一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
    并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失


    2. TTL和DLX


    rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

    1.TTL


    TTL是Time To Live的缩写, 也就是生存时间。
    RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
    如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
    默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消息,否则丢弃。

    设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
    设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0

    2.DLX和死信队列


    DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

    死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
    这个队列,就是死信队列

    成为死信一般有以下几种情况:
    消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
    消息的TTL-存活时间已经过期
    队列长度限制被超越(队列满)


    注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
    注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
    x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键

    3. 延迟队列


    通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费


    注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

    创建主模块,及子模块

    2.主模块
    <!-- 1.packaging模式改为pom -->
    <packaging>pom</packaging>

    <!-- 2.添加子模块 -->
    <modules>
    <module>rabbitmq-provider</module>
    <module>rabbitmq-consumer</module>
    <module>common-vo</module>
    </modules>

    在主模块的POM的<dependencies>中添加公共子模块common

    1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

    关键代码如下

    package com.hmc.rabbitmqprovider.rabbitmq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.xml.ws.WebEndpoint;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-23 23:39
     */
    @Configuration
    public class RabbitQueueConfig {
    
        //定义队列,交换机,路由键(正常)
        public  static  final  String NORMAL_QUEUE="normar-queue";
        public  static  final  String NORMAL_EXCHANGE="normar-exchange";
        public  static  final  String NORMAL_ROUTINGKEY="normar-routingkey";
    
    
    
        //定义队列,交换机,路由键(死信)
        public  static  final  String DELAY_QUEUE="delay-queue";
        public  static  final  String DELAY_EXCHANGE="delay-exchange";
        public  static  final  String DELAY_ROUTINGKEY="delay-routingkey";
    
    
         @Bean
        public Queue norma1Queue(){
             Map<String,Object> map=new HashMap<>();
             map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为10秒
             map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
             map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
            return  new Queue(NORMAL_QUEUE,true,false,false,map);
        }
        //直连交换机
        @Bean
        public DirectExchange normalExchange(){
             return  new DirectExchange(NORMAL_EXCHANGE,true,false);
        }
    
        @Bean
        public Binding normalBinding(){
             return BindingBuilder.bind(norma1Queue()).to(normalExchange())
                     .with(NORMAL_ROUTINGKEY);
        }
    
    //死信
    
       @Bean
       public  Queue delaQueue(){
             return new Queue(DELAY_QUEUE,true);
       }
    
       @Bean
       public  DirectExchange  delayExchange(){
             return  new DirectExchange(DELAY_EXCHANGE,true,false);
    
       }
    
         @Bean
        public  Binding delayBinding(){
          return  BindingBuilder.bind(delaQueue()).to(delayExchange())
               .with(DELAY_ROUTINGKEY);
       }
    }

     controller层

    package com.hmc.rabbitmqprovider.controller;
    
    import com.hmc.commonvo.vo.OrderVo;
    import com.hmc.rabbitmqprovider.rabbitmq.RabbitQueueConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 00:02
     */
    @RestController
    @Slf4j
    public class SendController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/senderBymodel")
        public  Map<String,Object> senderBymodel(String orderNo){
            OrderVo orderVo=new OrderVo();
    
            orderVo.setOrderId(10000001L);
            orderVo.setOrderNo(orderNo);
            orderVo.setCreatedate(new Date());
            //推送消息
            rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
            log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }
    
    
         @RequestMapping("/sender")
        public  Map<String,Object> sender(){
            Map<String,Object> data=this.createData();
    
            //推送消息
             rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                     RabbitQueueConfig.NORMAL_ROUTINGKEY,data);
             log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                     RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }
    
        private Map<String,Object> createData(){
            Map<String,Object> data=new HashMap<>();
            String  createData= LocalDateTime.now().
                    format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            String message="hello rabbitmq!!!";
            data.put("createData",createData);
            data.put("message",message);
            return data;
        }
    
    
    }

    yml文件配置

    #服务的端口和项目名
    server:
      port: 8081
      servlet:
        context-path: /rabbitmq-provider
    
    ## rabbitmq config
    spring:
        rabbitmq:
          host: 192.168.197.134
          port: 5672
          username: springcloud
          password: 123456
    
         ## 与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~
          virtual-host: my_vhost

    pom配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.hmc</groupId>
            <artifactId>rabbitmq03</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>rabbitmq-provider</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-provider</name>
        <packaging>jar</packaging>
        <description>Demo project for Spring Boot</description>
    
    
    </project>

      

    开启虚拟机测试生产者

     

    这里可以看出消息10秒未消费的情况下,会推送到死信队列,测试是成功的。

    接下来我们用消费者进行消费

    RabbitDelayReceiver
    
    
    
    
    

     3条消息已经成功消费

    json转换

      生产者代码

    package com.hmc.rabbitmqprovider.controller;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 00:57
     */
    
    /**
     * json转换
     1.生产者
     */
    @Configuration
    public class RabbitTemplateConfig {
    
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
               RabbitTemplate rabbitTemplate=new RabbitTemplate();
               rabbitTemplate.setConnectionFactory(connectionFactory);
               rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
               return  rabbitTemplate;
        }
    
        @Bean
        public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
         return new Jackson2JsonMessageConverter();
        }
    }

    消费者代码

    package com.hmc.rabbitmqconsumer.rabbitmq;
    
    /**
     * @author胡明财
     * @site www.xiaomage.com
     * @company xxx公司
     * @create  2019-12-24 01:16
     */
    
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * json转换
     * 消费者
     */
    @Configuration
    public class RabbitListenerReceiverConfig {
    
        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(jackson2JsonMessageConverter());
            return factory;
        }
    
        @Bean
        public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
    
    }

    yml文件配置

    #服务的端口和项目名
    server:
      port: 8082
      servlet:
        context-path: /rabbitmq-consumer
    
    ## rabbitmq config
    spring:
        rabbitmq:
          host: 192.168.197.134
          port: 5672
          username: springcloud
          password: 123456
    
         ## 与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~与启动容器时虚拟主机名字一致~~~
          virtual-host: my_vhost

    pom配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.hmc</groupId>
            <artifactId>rabbitmq03</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>rabbitmq-cousumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-cousumer</name>
        <packaging>jar</packaging>
        <description>Demo project for Spring Boot</description>
    
    
    </project>

    controller

    RequestMapping("/senderBymodel")
        public  Map<String,Object> senderBymodel(String orderNo){
            OrderVo orderVo=new OrderVo();
    
            orderVo.setOrderId(10000001L);
            orderVo.setOrderNo(orderNo);
            orderVo.setCreatedate(new Date());
            //推送消息
            rabbitTemplate.convertAndSend(RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY,orderVo);
            log.info("生产者发送消息,Exchange={}","routingkey={}",RabbitQueueConfig.NORMAL_EXCHANGE,
                    RabbitQueueConfig.NORMAL_ROUTINGKEY);
            Map<String,Object> json=new HashMap<>();
            json.put("code",1);
            json.put("msg","发送消息成功。。。");
            return json;
        }

    测试

    消息格式

    消费者接收消息格式

  • 相关阅读:
    PAT (Advanced Level) Practice 1054 The Dominant Color (20 分)
    PAT (Advanced Level) Practice 1005 Spell It Right (20 分) (switch)
    PAT (Advanced Level) Practice 1006 Sign In and Sign Out (25 分) (排序)
    hdu 5114 Collision
    hdu4365 Palindrome graph
    单链表查找最大值、两个递增的链表合并并且去重
    蓝桥杯-最短路 (SPFA算法学习)
    蓝桥杯-最大最小公倍数
    Codeforces-470 div2 C题
    蓝桥杯-地宫取宝
  • 原文地址:https://www.cnblogs.com/xmf3628/p/12097101.html
Copyright © 2011-2022 走看看