zoukankan      html  css  js  c++  java
  • rabbitmq的延迟队列(五)

    延迟队列

    在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

     rabbitmq-high-producer项目

     application.properties文件

    复制代码
    server.port=8081
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #rabbitmq限流,必须在ack确认才能使用
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #最大的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
    spring.rabbitmq.listener.simple.prefetch=2
            
    复制代码
    
    
    DelayController类
    package com.qingfeng.rabbitmqhighproducer.delay;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    @RestController
    @RequestMapping("delay")
    public class DelayController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
    
        // 发送延迟队列消息
        //http://127.0.0.1:8081/delay/sendDelay
        @GetMapping("/sendDelay")
        public String sendDelay() {
            String messageId = String.valueOf(UUID.randomUUID());
            //order_exchange正常交换机  test.order.wq:正常交换机与正常绑定的队列的路由
            rabbitTemplate.convertAndSend("order_exchange", "order.wq", "订单id"+messageId);
            return "ok";
        }
    
    }
    DelayQueueRabbitConfig类
    package com.qingfeng.rabbitmqhighproducer.delay.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 延迟队列
     */
    @Configuration
    public class DelayQueueRabbitConfig {
    
        //正常队列名称
        public static final String ORDER_QUEUE = "order_queue";
        //正常交换机名称
        public static final String ORDER_EXCHANGE = "order_exchange";
    
        //ttl过期时间  10s
        private static final int ORDER_EXPIRATION = 10000;
    
        //设置正常队列长度限制
        private static final int ORDER_LENGTH = 10;
    
        //死信队列名称
        public static final String ORDER_DLX_QUEUE = "order_dlx_queue";
        //死信交换机名称
        public static final String ORDER_DLX_EXCHANGE = "order_dlx_exchange";
    
    
        //声明正常交换机
        @Bean("orderExchange")
        public TopicExchange orderExchange(){
            return new TopicExchange(ORDER_EXCHANGE);
        }
    
        //声明正常队列绑定死信队列的交换机
        //.withArgument("x-dead-letter-routing-key", "order.cencel")为死信队列和死信交换机的绑定路由dlx.order.#
        @Bean("orderQueue")
        public Queue orderQueue(){
            return QueueBuilder.durable(ORDER_QUEUE)
                    .withArgument("x-dead-letter-exchange", ORDER_DLX_EXCHANGE)
                    .withArgument("x-dead-letter-routing-key", "dlx.order.cancel")
                    .withArgument("x-message-ttl", ORDER_EXPIRATION)
                    .withArgument("x-max-length",ORDER_LENGTH)
                    .build();
        }
    
        //声明正常队列和正常交换机的绑定
        @Bean
        public Binding orderBinding(){
            return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.#");
        }
    
    //=========================================================================
    
        //声明死信队列
        @Bean("orderDlxQueue")
        public Queue orderDlxQueue(){
            return new Queue(ORDER_DLX_QUEUE);
        }
        //声明死信交换机
        @Bean("orderDlxExchange")
        public TopicExchange orderDlxExchange(){
            return new TopicExchange(ORDER_DLX_EXCHANGE);
        }
        //声明死信队列和死信交换机的绑定
        @Bean
        public Binding orderDlxBinding(){
            return BindingBuilder.bind(orderDlxQueue()).to(orderDlxExchange()).with("dlx.order.#");
        }
    
    
    }
    启动rabbitmq-high-producer项目
    访问:http://127.0.0.1:8081/delay/sendDelay

     我们在设置的ttl过期时间10000毫秒过后,也就是10秒后,正常队列的消息会转到死信队列里面去

    rabbitmq-high-consumer项目的

    DelayListener类
    package com.qingfeng.rabbitmqhighconsumer.delay;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumer ACK机制:
     *  1. 设置手动签收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *  2. 让监听器类实现ChannelAwareMessageListener接口
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     */
    
    @Component
    public class DelayListener {
    
        //手动签收
        //延迟队列消息,是到了死信队列里面去了,我们要监听死信队列
        @RabbitHandler
        @RabbitListener(queues = "order_dlx_queue")
        public void onMessage(Message message, Channel channel) throws Exception {
            //Thread.sleep(1000);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println("接受到的消息为"+new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                System.out.println("根据订单id查询其状态...");
                System.out.println("判断状态是否为支付成功");
                System.out.println("取消订单,回滚库存....");
                //int i = 1/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                /**
                 * 4.有异常就拒绝签收
                 * basicNack(long deliveryTag, boolean multiple, boolean requeue)
                 * 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费
                 * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
                 * alse:将消息丢弃
                 */
                System.out.println("有异常就拒绝签收");
                //拒绝签收,不重回队列,requeue为false,这样才能到死信队列里面去
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }

    但我们启动rabbitmq-high-consumer项目

    延迟队列小结:

      1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

      2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

  • 相关阅读:
    HTML文档类型DTD与浏览器怪异模式
    css控制非固定文本自动换行
    CSS圆角兼容IE6
    css命名规则
    一个程序员的世界(三) 全:(原名:一个程序员的世界观)
    Apache Ignite 学习笔记(6): Ignite中Entry Processor使用
    Apache Ignite 学习笔记(五): Primary和backup数据同步模式和处理分片丢失的策略
    Apache Ignite 学习笔记(四): Ignite缓存冗余备份策略
    Apache Ignite 学习笔记(三): Ignite Server和Client节点介绍
    Apache Ignite 学习笔记(二): Ignite Java Thin Client
  • 原文地址:https://www.cnblogs.com/Amywangqing/p/14696155.html
Copyright © 2011-2022 走看看