zoukankan      html  css  js  c++  java
  • Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列

    〇、什么是消息队列

      参考:新手也能看懂,消息队列其实很简单

            RabbitMQ运行模型与名词解释

    一、应答模式

      1.什么是应答?

        消息投递到交换器(exchange)中,交换器给我们的反馈,是保障消息投递成功的一种机制。

      2.测试

      配置:

     1 #选择确认类型为交互
     2 spring.rabbitmq.publisher-confirm-type=correlated

      测试方法:

     1     @Test
     2     /**
     3      * the test is testing confirm-function in rabbitmq
     4      */
     5     void messageSendTestWithConfirm(){
     6 
     7         /*
     8          * 设置消息确认回调方法;
     9          * @ack 为true时,表示投递成功;为false表示投递失败;
    10          * @CorrelationData 为自定义反馈信息;
    11          * @cause 为投递失败的原因;
    12          */
    13         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    14             @Override
    15             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    16                 if(!ack){
    17                     logger.error("correlationData:"+ correlationData);
    18                     logger.error("ack:"+ack);
    19                     logger.error("cause:"+cause);
    20                 }
    21             }
    22         });
    23 
    24         //消息内容
    25         Map<String,String> map = new HashMap<>();
    26         map.put("message","testing confire function");
    27 
    28         //设置自定义反馈消息
    29         String uuid = UUID.randomUUID().toString();
    30         logger.info("消息唯一ID:"+uuid);
    31         CorrelationData correlationData = new CorrelationData();
    32         correlationData.setId(uuid);
    33 
    34         //并不存在名为“exchange-dog”的exchange
    35         rabbitTemplate.convertAndSend("exchange-dog","dog",map,correlationData);
    36 
    37     }

      测试结果

    1  c.d.amqp.SpringBootAmqpApplicationTests  : 消息唯一ID:e6601e83-fad7-4b53-9968-c74828e62b23
    2  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.22.130:5672]
    3  o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#f7cdf8:0/SimpleConnection@397ef2 [delegate=amqp://guest@192.168.22.130:5672/, localPort= 8055]
    4  o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)
    5  c.d.amqp.SpringBootAmqpApplicationTests  : correlationData:CorrelationData [id=e6601e83-fad7-4b53-9968-c74828e62b23]
    6  c.d.amqp.SpringBootAmqpApplicationTests  : ack:false
    7  c.d.amqp.SpringBootAmqpApplicationTests  : cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)

      注意:Confirm模式只管有无投递到exchange,而不管有无发送到队列当中。

    二、返回模式

      1.什么是返回模式?

        当消息未投递到queue时的反馈。

      2.测试

      配置:

    1 #开启返回模式
    2spring.rabbitmq.publisher-returns=true

      测试方法:

     1     @Test
     2     void messageSendTestWithReturn(){
     3         /*
     4          * 设置消息返回回调方法;
     5          * 该方法执行时则表示消息投递失败
     6          * @message 为反馈信息;
     7          * @replyCode 一个反馈代码,表示不同投递失败原因;
     8          * @replyText 反馈信息
     9          */
    10         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    11             @Override
    12             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    13                 logger.error("返回消息配置:"+message.getMessageProperties().toString());
    14                 logger.error("反馈代码:"+replyCode);
    15                 logger.error("反馈内容:"+replyText);
    16                 logger.error("exchange:"+exchange);
    17                 logger.error("routingKey:"+routingKey);
    18             }
    19         });
    20 
    21         //消息内容
    22         Map<String,String> map = new HashMap<>();
    23         map.put("message","testing return function");
    24 
    25         //并不存在名为“dog”的routingKey,即投不到现有的queue里
    26         rabbitTemplate.convertAndSend("exchange-direct","dog",map);
    27     }

      测试结果:

    1 c.d.amqp.SpringBootAmqpApplicationTests  : 返回消息配置:MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]
    2 c.d.amqp.SpringBootAmqpApplicationTests  : 反馈代码:312
    3 c.d.amqp.SpringBootAmqpApplicationTests  : 反馈内容:NO_ROUTE
    4 c.d.amqp.SpringBootAmqpApplicationTests  : exchange:exchange-direct
    5 c.d.amqp.SpringBootAmqpApplicationTests  : routingKey:dog

    三、限流策略(手动应答消息)

      1.为什么要限流?

        若队列中消息积压过多,突然开启监听,会导致消费端崩溃。

      2.如何限流?

        使用RabbitMQ提供的Qos(服务质量保证)功能,如果一定数目消息的未被应答前,不再接受新消息。

      3.测试

      配置

    1 #手动消息应答
    2 spring.rabbitmq.listener.simple.acknowledge-mode=manual

      

      测试

     1     /*
     2      * 消息手动应答
     3      * @RabbitListener注解监听来自指定队列的消息
     4      */
     5 
     6     @RabbitListener(queues = "springboot-queue")
     7     public void revice(Message message,Channel channel) throws IOException {
     8         try{
     9             logger.info("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
    10             logger.info("消息标签:"+String.valueOf(message.getMessageProperties().getDeliveryTag()));
    11             /* 设置Qos机制
    12              * 第一个参数:单条消息的大小(0表示即无限制)
    13              * 第二个参数:每次处理消息的数量
    14              * 第三个参数:是否为consumer级别(false表示仅当前channel有效)
    15              */
    16             channel.basicQos(0,1,false);
    17             //手动应答消息  第一个参数是所确认消息的标识,第二参数是是否批量确认
    18             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    19             
    20         }catch (Exception e){
    21             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    22             logger.error("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
    23             logger.error("接收消息发送错误:"+e.getMessage());
    24         }
    25     }

    四、死信队列

      1.什么是死信?

        未被正常处理的消息。

      2.出现死信的情况

        (1)消息被拒绝(reject、nack),并且不重新放回原队列(requeue=false);

        (2)消息过期(设置了Expiration);

        (3)队列已满。

      3.配置

      以下代码配置了一个正常的exchange、queue和 一个专门接收死信的exchange、queue

      在队列中配置x-dead-letter-exchange参数,表示本队列出现死信,则转发到配置指定的交换器中去

     1     /*
     2      * 创建消息队列的对象:exchange、queue、绑定规则
     3      */
     4     @Test
     5     void createObjectOfMQ(){
     6 
     7         /*
     8          * 在普通队列上配置参数。表示若本队列有死信,则转发到配置指定的转发器中去
     9          * @参数键:x-dead-letter-exchange
    10          * @参数名:接收死信的交换器
    11          */
    12         Map<String,Object> arguments = new HashMap<>();
    13         arguments.put("x-dead-letter-exchange","springboot-dlx-exchange");
    14 
    15         amqpAdmin.declareExchange(new DirectExchange("springboot-direct"));
    16         amqpAdmin.declareQueue(new Queue("springboot.queue",true,false,false,arguments));
    17         amqpAdmin.declareBinding(new Binding("springboot.queue", Binding.DestinationType.QUEUE,"springboot-direct","springboot",null));
    18 
    19         //接收死信的交换器
    20         amqpAdmin.declareExchange(new TopicExchange("springboot-dlx-exchange"));
    21         //交换器收到的死信都转发到该队列,#表示接收所有消息
    22         amqpAdmin.declareQueue(new Queue("springboot-dlx.queue",true));
    23         amqpAdmin.declareBinding(new Binding("springboot-dlx.queue", Binding.DestinationType.QUEUE,"springboot-dlx-exchange","#",null));
    24     }

      运行以上方法后,可在RabbitMQ管理界面看到两个exchange、两个queue成功创建

      4.发送消息

     1    @Test
     2     void sendMessageWithTTL(){
     3         String str = "dlx test";
     4 
     5         //setExpiration表示设置该消息存活时间
     6         //5秒后该消息未被消息,则转发到死信交换器
     7         Message message = MessageBuilder.withBody(str.getBytes())
     8                 .setExpiration("5000")
     9                 .setContentEncoding("UTF-8")
    10                 .setMessageId("dlx-001")
    11                 .build();
    12 
    13         //将消息发送到配置了"x-dead-letter-exchange"参数的队列
    14         rabbitTemplate.convertAndSend("springboot-direct","springboot",message);
    15     }

       发送完消息,过5秒之后可以看到信息已被投递到死信队列中去了

     

    setExpiration
  • 相关阅读:
    混合式应用开发之AngularJS ng-repeat数组有重复值的解决方法
    混合式应用开发之串口通讯(2)
    混合式应用开发之串口通讯(1)
    第一篇博客
    win10出现"本地计算机上的MySQL57服务启动后停止"
    彻底区分html的attribute与dom的property
    Angularv4入门篇1
    node开发后将本地mysql数据导入到服务器mysql
    weex入门
    Color.js 方便修改颜色值
  • 原文地址:https://www.cnblogs.com/Drajun/p/12301304.html
Copyright © 2011-2022 走看看