zoukankan      html  css  js  c++  java
  • rabbitmq 不发送ack消息如何处理: RabbitMQ 消息确认以及消息消费方处理消息时候抛出了异常以

    本篇的代码使用的前面两篇文章《RabbitMQ与Spring整合之消息生产方》和《RabbitMQ与Spring整合之消息消费方》的代码,这两篇文件里配置文件的名称不正确,不可直接运行。

    一 自动确认机制

    在服务消费者rabbitmq.xml 做修改:

    1.  
      <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    2.  
      <rabbit:listener-container acknowledge="auto"
    3.  
      connection-factory="connectionFactory">
    4.  
      <rabbit:listener queues="spring_queue_test_01" ref="consumerService" />
    5.  
      </rabbit:listener-container>                             

    添加了配置 acknowledge="auto",这里来配置mq的确认机制,auto 自动确认,这也是默认缺省的配置。

    特点:消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    ConsumerService.java

    1.  
      package cn.mn.app;
    2.  
      import org.springframework.amqp.core.Message;
    3.  
      import org.springframework.amqp.core.MessageListener;
    4.  
      public class ConsumerService implements MessageListener{
    5.  
       
    6.  
      public void onMessage(Message msg) {
    7.  
      Object obj=null;
    8.  
      System.out.println("msg-------->:"+new String(msg.getBody()));
    9.  
      try {
    10.  
      Thread.sleep(10000);
    11.  
      } catch (InterruptedException e) {
    12.  
      e.printStackTrace();
    13.  
      }
    14.  
      System.out.println("休眠结束,5秒后异常-----------------");
    15.  
      try {
    16.  
      Thread.sleep(5000);
    17.  
      } catch (InterruptedException e) {
    18.  
      e.printStackTrace();
    19.  
      }
    20.  
      System.out.println(obj.toString());
    21.  
      }
    22.  
       
    23.  
      }

    可以看到同一条消息不断在控制台打印出来,不断的抛出空指针。在MQ管理界面上看到消息始终存在。

    二 手动确认    

    1.  
      <?xml version="1.0" encoding="UTF-8"?>
    2.  
      <beans xmlns="http://www.springframework.org/schema/beans"
    3.  
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    4.  
      xsi:schemaLocation="http://www.springframework.org/schema/beans
    5.  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    6.  
      http://www.springframework.org/schema/beans
    7.  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    8.  
      http://www.springframework.org/schema/rabbit
    9.  
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
    10.  
      <!--配置connection-factory,指定连接rabbit server参数 -->
    11.  
      <rabbit:connection-factory id="connectionFactory"
    12.  
      host="127.0.0.1" />
    13.  
       
    14.  
      <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    15.  
      <rabbit:admin connection-factory="connectionFactory" />
    16.  
       
    17.  
      <!--定义queue,如果mq服务器中没,服务器会自动创建 -->
    18.  
      <rabbit:queue name="spring_queue_test_01" durable="true"
    19.  
      auto-delete="false" exclusive="false" />
    20.  
       
    21.  
      <!-- 定义direct exchange,绑定,如果服务器中没有会自动创建 -->
    22.  
      <rabbit:direct-exchange name="spring_exchange_test_01"
    23.  
      durable="true" auto-delete="false">
    24.  
      <rabbit:bindings>
    25.  
      <rabbit:binding queue="spring_queue_test_01" key="spring_queue_test_01_key"></rabbit:binding>
    26.  
      </rabbit:bindings>
    27.  
      </rabbit:direct-exchange>
    28.  
       
    29.  
      <!-- 消息接收者 -->
    30.  
      <bean id="consumerService" class="cn.mn.app.ConsumerService"></bean>
    31.  
      <bean id="cnsumerServiceManu" class="cn.mn.app.ConsumerServiceManu"></bean>
    32.  
       
    33.  
       
    34.  
      <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    35.  
      <rabbit:listener-container acknowledge="manual"
    36.  
      connection-factory="connectionFactory">
    37.  
      <rabbit:listener queues="spring_queue_test_01" ref="cnsumerServiceManu" />
    38.  
      </rabbit:listener-container>
    39.  
       
    40.  
      </beans>
    acknowledge="manual" 监听类有所变化,从MessageListener换成ChannelAwareMessageListener,因为这个接口可以提供channel,使用channel来发送确认信号。
     
    ConsumerServiceManu.java
    1.  
      package cn.mn.app;
    2.  
       
    3.  
      import java.io.IOException;
    4.  
       
    5.  
      import org.springframework.amqp.core.Message;
    6.  
      import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    7.  
       
    8.  
      import com.rabbitmq.client.Channel;
    9.  
       
    10.  
      public class ConsumerServiceManu implements ChannelAwareMessageListener {
    11.  
       
    12.  
      public void onMessage(Message message, Channel channel) throws IOException {
    13.  
      System.out.println("consumer--:" + message.getMessageProperties()
    14.  
      + ":" + new String(message.getBody()));
    15.  
      //确认
    16.  
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    17.  
      }
    18.  
      }
    特点:只有服务端收到确认信号,即channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);确认成功,消息才会移除,确认成功后不管后面是异常还是断开服务消息已经被移除了。如果在确认之前抛出异常,消息不会移除,也不会重试,监听程序会因为异常停止不再处理消息    ,如果此时断开服务,消息重新回到队列。
     

    三 不适用确认

    1.  
      <rabbit:listener-container acknowledge="none"
    2.  
      connection-factory="connectionFactory">
    3.  
      <rabbit:listener queues="spring_queue_test_01" ref="consumerService" />
    4.  
      </rabbit:listener-container>

    特点:acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

    https://blog.csdn.net/liangwenmail/article/details/80542619

  • 相关阅读:
    Centos7安装docker
    Centos 7快速安装之packstack
    mysql基础知识复习
    Linux系统部署samba服务记录
    简单python程序练习
    Docker 搭建pxc集群 + haproxy + keepalived 高可用(二)
    Docker 搭建pxc集群 + haproxy + keepalived 高可用(一)
    linux下的find文件查找命令与grep文件内容查找命令
    db2创建nickname
    oracle 启动报ORA-01105 ORA-19808
  • 原文地址:https://www.cnblogs.com/softidea/p/9450088.html
Copyright © 2011-2022 走看看