zoukankan      html  css  js  c++  java
  • rabbitmq 不发送ack消息如何处理:rabbitmq可靠发送的自动重试机制

    http://www.jianshu.com/p/4112d78a8753

    接这篇

    在上文中,主要实现了可靠模式的consumer。而可靠模式的sender实现的相对简略,主要通过rabbitTemplate来完成。
    本以为这样的实现基本是没有问题的。但是前段时间做了一个性能压力测试,但是发现在使用rabbitTemplate时,会有一定的丢数据问题。

    当时的场景是用30个线程,无间隔的向rabbitmq发送数据,但是当运行一段时间后发现,会出现一些connection closed错误,rabbitTemplate虽然进行了自动重连,但是在重连的过程中,丢失了一部分数据。当时发送了300万条数据,丢失在2000条左右。
    这种丢失率,对于一些对一致性要求很高的应用(比如扣款,转账)来说,是不可接受的。

    在google了很久之后,在stackoverflow上找到rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加connection数:

    connectionFactory.setChannelCacheSize(100);

    修改之后,确实不再出现connection closed这种错误了,在发送了3000万条数据后,一条都没有丢失。
    似乎问题已经完美的解决了,但是我又想到一个问题:当我们的网络在发生抖动时,这种方式还是不是安全的?
    换句话说,如果我强制切断客户端和rabbitmq服务端的连接,数据还会丢失吗?

    为了验证这种场景,我重新发送300万条数据,在发送过程中,在rabbitmq的管理界面上点击强制关闭连接:

    然后发现,仍然存在丢失数据的问题。

    看来这个问题,没有想象中的那么简单了。

    在阅读了部分rabbitTemplate的代码之后发现:
    1 rabbitTemplate的ack确认机制是异步的
    2 这种确认机制是一种事后发现机制,并不能同步的发现问题
    也就是说,即便打开了

    1.  
      connectionFactory.setPublisherConfirms(true);
    2.  
      rabbitTemplate.setMandatory(true);

    并且实现了:

    1.  
      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    2.  
      if (!ack) {
    3.  
      log.info("send message failed: " + cause + correlationData.toString());
    4.  
      }
    5.  
      });

    依旧是不安全的。
    rabbitTemplate的发送流程是这样的:
    1 发送数据并返回(不确认rabbitmq服务器已成功接收)
    2 异步的接收从rabbitmq返回的ack确认信息
    3 收到ack后调用confirmCallback函数
    注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

    在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

    最完美的解决方案只有1种:
    使用rabbitmq的事务机制。
    但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。
    第二种解决方式,使用同步的发送机制,也就是说,客户端发送数据,rabbitmq收到后返回ack,再收到ack后,send函数才返回。代码类似这样:

    1.  
      创建channel
    2.  
      send message
    3.  
      wait for ack(or 超时)
    4.  
      close channel
    5.  
      返回成功or失败

    同样的,由于每次发送message都要重新建立连接,效率很低。

    基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
    在rabbitTemplate异步确认的基础上
    1 在本地缓存已发送的message
    2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
    3 定时扫描本地的message,如果大于一定时间未被确认,则重发

    当然了,这种解决方式也有一定的问题:
    想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。
    自动重试的代码如下:

    1.  
      public class RetryCache {
    2.  
      private MessageSender sender;
    3.  
      private boolean stop = false;
    4.  
      private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
    5.  
      private AtomicLong id = new AtomicLong();
    6.  
       
    7.  
      @NoArgsConstructor
    8.  
      @AllArgsConstructor
    9.  
      @Data
    10.  
      private static class MessageWithTime {
    11.  
      long time;
    12.  
      Object message;
    13.  
      }
    14.  
       
    15.  
      public void setSender(MessageSender sender) {
    16.  
      this.sender = sender;
    17.  
      startRetry();
    18.  
      }
    19.  
       
    20.  
      public String generateId() {
    21.  
      return "" + id.incrementAndGet();
    22.  
      }
    23.  
       
    24.  
      public void add(String id, Object message) {
    25.  
      map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
    26.  
      }
    27.  
       
    28.  
      public void del(String id) {
    29.  
      map.remove(id);
    30.  
      }
    31.  
       
    32.  
      private void startRetry() {
    33.  
      new Thread(() ->{
    34.  
      while (!stop) {
    35.  
      try {
    36.  
      Thread.sleep(Constants.RETRY_TIME_INTERVAL);
    37.  
      } catch (InterruptedException e) {
    38.  
      e.printStackTrace();
    39.  
      }
    40.  
       
    41.  
      long now = System.currentTimeMillis();
    42.  
       
    43.  
      for (String key : map.keySet()) {
    44.  
      MessageWithTime messageWithTime = map.get(key);
    45.  
       
    46.  
      if (null != messageWithTime) {
    47.  
      if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
    48.  
      log.info("send message failed after 3 min " + messageWithTime);
    49.  
      del(key);
    50.  
      } else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
    51.  
      DetailRes detailRes = sender.send(messageWithTime.getMessage());
    52.  
       
    53.  
      if (detailRes.isSuccess()) {
    54.  
      del(key);
    55.  
      }
    56.  
      }
    57.  
      }
    58.  
      }
    59.  
      }
    60.  
      }).start();
    61.  
      }
    62.  
      }

    在client端发送之前,先在本地缓存message,代码如下:

    1.  
      @Override
    2.  
      public DetailRes send(Object message) {
    3.  
      try {
    4.  
      String id = retryCache.generateId();
    5.  
      retryCache.add(id, message);
    6.  
      rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
    7.  
      } catch (Exception e) {
    8.  
      return new DetailRes(false, "");
    9.  
      }
    10.  
       
    11.  
      return new DetailRes(true, "");
    12.  
      }

    在收到ack时删除本地缓存,代码如下:

    1.  
      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    2.  
      if (!ack) {
    3.  
      log.info("send message failed: " + cause + correlationData.toString());
    4.  
      } else {
    5.  
      retryCache.del(correlationData.getId());
    6.  
      }
    7.  
      });

    再次验证刚才的场景,发送300w条数据,在发送的过程中过一段时间close一次connection,发送结束后,实际发送数据301.2w条,有一些重复,但是没有丢失数据。
    同时需要验证本地缓存的内存泄露问题,程序连续发送1.5亿条数据,内存占用稳定在900M,并没有明显的波动。

    最后贴一下rabbitmq的性能测试数据:
    1 300w条1k的数据,单机部署rabbitmq(8核,32G)
    在ack确认模式下平均发送效率为1.1w条/秒
    非ack确认模式下平均发送效率为1.6w条/秒

    2 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
    在ack确认模式下平均发送效率为1.3w条/秒
    非ack确认模型下平均发送效率为1.7w条/秒

    3 300w条1k的数据,单机部署rabbitmq(8核,32G)
    在ack确认模式下平均消费效率为9000条/秒

    4 300w条1k的数据,cluster模式部署3台(8核*3, 32G*3)
    在ack确认模式下平均消费效率为1w条/秒


    代码地址:

    https://github.com/littlersmall/rabbitmq-access

     
     
    https://blog.csdn.net/jmdonghao/article/details/76153757
  • 相关阅读:
    November 13th 2016 Week 47th Sunday The 1st Day
    November 12th 2016 Week 46th Saturday
    November 11th 2016 Week 46th Friday
    November 10th 2016 Week 46th Thursday
    November 9th 2016 Week 46th Wednesday
    November 8th 2016 Week 46th Tuesday
    windows 7文件共享方法
    Win7无线网络共享设置方法
    常量指针和指针常量
    如何查找局域网的外网ip
  • 原文地址:https://www.cnblogs.com/softidea/p/9450075.html
Copyright © 2011-2022 走看看