zoukankan      html  css  js  c++  java
  • 十四、死信、延时和优先级队列

    死信队列

    DLX(Dead-Letter-Exchange),可以称为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。

    消息变成死信队列有下面几个情况:

    1. 消息被拒绝(channel.basicNack或channel.basicReject),并且设置requeue参数为false;
    2. 消息过期;
    3. 队列达到最大长度。

    在声明队列时,在队列参数属性中指定DLX,RabbitMQ就会将死信重新发布到指定的DLX上,从而被路由到死信队列中。通过监听这个队列的消息进行处理,并将消息的TTL设置为0配合使用可以弥补immediate参数的功能。

    • 通过在channel.queueDeclare方法添加DLX:

       //创建DLX
      channel.exchangeDeclare("dlx_exchange", BuiltinExchangeType.DIRECT, true);
      Map<String, Object> arguments = new HashMap<>();
      arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
      //为队列添加DLX
      channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments);
      

      也可以为DLX指定路由键,如果没有指定,则使用原队列的路由键:

      arguments.put("x-dead-letter-routing-key", DLX_QUEUE);
      

      下面代码设置了TTL、DLX、DLK(DLX路由键):

      public class Send {
          final static String NOREMAL_EXCHANGE = "normal_exchange";
          final static String NOREMAL_QUEUE= "normal_queue";
          final static String DLX_EXCHANGE= "dlx_exchange";
          final static String DLX_QUEUE= "dlx_queue";
      
          public static void main(String[] args) {
              Connection connection = null;
              Channel channel;
              try {
                  connection = ConnectionUtils.getConnection();
                  channel = connection.createChannel();
      
                  //正常交换器
                  channel.exchangeDeclare(NOREMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
                  //创建死信交换器
                  channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true);
      
                  //设置正常队列参数,添加死信队列
                  Map<String, Object> arguments = new HashMap<>();
                  //队列消息过期时间
                  arguments.put("x-message-ttl", 1000);
                  //设置死信队列
                  arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
                  //设置DLX路由键,不设置,则使用原队列的路由键
                  arguments.put("x-dead-letter-routing-key", DLX_QUEUE);
                  channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments);
                  channel.queueBind(NOREMAL_QUEUE, NOREMAL_EXCHANGE, NOREMAL_QUEUE);
      
                  channel.queueDeclare(DLX_QUEUE, true, false, false, null);
                  channel.queueBind(DLX_QUEUE,DLX_EXCHANGE,DLX_QUEUE);
      
                  channel.basicPublish(NOREMAL_EXCHANGE, NOREMAL_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, "死信队列".getBytes("utf-8"));
              } catch (IOException e) {
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  e.printStackTrace();
              } finally {
                  if (connection != null) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      }
      

      代码里创建了两个交换器normal_exchangedlx_exchange,分别绑定了两个队列normal_exchangedlx_exchange

      运行上面代码后,10秒后,在Web管理页面中,可以看到如下内容。在10秒消息过期后变成死信,消息发布到了死信队列。也可以看到第二列中的DTTLDLX标记,D为持久化durable

    延时队列

    延时队列存储的对象是对应的延迟消息,延迟消息指当消息被发送到队列后,并不立即被消费者拿到消息,而是等待特定的时间后,消费者才能拿到消息。

    延时队列应用场景有:

    • 在订单系统中,用户下单后,通常有30分钟的付款时间。如果30分钟后没有付款,则订单被取消,这里可以使用延时队列处理订单。
    • 用户通过远程控制家里的智能设备在指定时间进行工作,可以将用户指定发送到延时队列,到达时间后再推送到智能设备。

    在RabbitMQ中,延时队列通过前面的DLX和TTL共同作用可以模拟出延迟队列功能。

    和死信队列消费者监听正常队列不同,延时队列中消费者监听的是死信队列。当消费在设置DLX和TTL后,发送到队列中经过指定时候后变成死信,死信重新发送到死信队列,而消费者监听到死信队列中有消息而进行消费,这样就达到了消息的延时。

    优先级队列、消息

    优先级队列,具有优先级的队列具有高的优先权,优先级高的消息具有优先被消费的特权。

    通过设置队列的x-max-priority参数设置队列优先级,之后在发送消息时设置消费优先级:

    public class Send {
        final  static String exchange = "priority_test";
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel;
            try {
                connection = ConnectionUtils.getConnection();
                channel = connection.createChannel();
                channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
    
                //设置队列的优先级为10
                Map<String, Object> arguments = new HashMap<>();
                arguments.put("x-max-priority", 10);
                channel.queueDeclare(exchange, true, false, false, arguments);
    
                channel.queueBind(exchange, exchange, exchange);
    
                //设置消息的优先级
                AMQP.BasicProperties build = new AMQP.BasicProperties
                        .Builder()
                        .priority(5)   //设置消息的优先级为5,默认最低为0。最大不能超过队列优先级
                        .build();
    
                channel.basicPublish(exchange, exchange, build, "队列、消息优先级".getBytes("utf-8"));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    运行上面代码,可以在Web管理页面看到优先级队列第二列有Pri标志:

  • 相关阅读:
    html5中input弹窗时,不弹出输入法弹出。
    ajax异步提交
    WinForm更新文件
    固态硬盘上安装Windows8(ghost)启动问题
    刷新页面Js
    流媒体
    WebOffice上传Word限制设置
    js页面传参中文乱码问题
    weboffice(点聚)在传参为汉字时的乱码问题
    Linq中Lanbda表达式做参数
  • 原文地址:https://www.cnblogs.com/zenghi-home/p/10065439.html
Copyright © 2011-2022 走看看