zoukankan      html  css  js  c++  java
  • RabbitMq防止消息丢失

    RabbitMq如何防止消息丢失?

    之前一直使用RocketMq。由于工作原因项目中用到RabbitMQ,在使用之前还是有必要了解下。所以带着第一个问题查询了些资料。

    MQ若想避免消息丢失,当然只能做的尽量。除了各种MQ不同的主备或者集群策略外,总的指导原则就是:

    1、生产者->broker 消息不丢失。 2、broker 消息不丢失, 3、broker->Consumer消息不丢失

    每种MQ对于上面三个问题都有自己的解决方案。对于rabbitMq如何解决这个问题。落实到代码层面上:

    1、生产者到broker消息不丢失(消息持久化到磁盘)

     使用springboot集成,由于经过了一层层封装,所以经过查看源码,发现最后通过channel.basicPublish()向mq发布消息的时候需要设置 messageProperities。

    而MessageProperties使用Message中读取的,所以从哪里如何设置MessageProperties成了问题。好在发现SpringBoot增加MessageProcess这一接口,然后可以定制自己信息

    (1)message持久化

    1  rabbitTemplate.convertAndSend(rabbitMqProperty.getOrderMsg().getExchangeName(), rabbitMqProperty.getOrderMsg().getRouteKeyName(), messages, new MessagePostProcessor() {
    2     @Override
    3  public Message postProcessMessage(Message message) throws AmqpException {
    4         // 消息持久化
    5  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    6  return message;
    7  }
    8 });

    (2)return-callback设置

         设置RabbitMq publisher-returns 属性为true, 即消息没到成功到达queue会触发CallBack回调。  即 生产者的 确认机制

     1   @Bean
     2     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
     3         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
     4         rabbitTemplate.setMandatory(true);
     5         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
     6         rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
     7             mqSendFailProcesser.retrySendMessage(rabbitTemplate, message, replyCode, replyText, exchange, routingKey);
     8         });
     9         return rabbitTemplate;
    10     }

    2、broker消息不丢失

          队列Queue和交换机持久化到磁盘

    1     @Bean
    2     public Queue payNofityQueue() {
    3         return  QueueBuilder.durable(payNotifyMqProperty.getQueueName()).build();
    4     }
    5 
    6     @Bean
    7     public DirectExchange exchange() {
    8         return new DirectExchange(payNotifyMqProperty.getExchangeName(), true, false);
    9     }

      

    3、broker到消费者消息不丢失(ACK应答)

    1 失败响应
    2 
    3 private void sendNack(Message message, Channel channel) throws IOException {
    4     channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
    5 }
    6 成功响应
    7 private void sendAck(Message message, Channel channel) throws IOException {
    8     channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    9 }
  • 相关阅读:
    IOS使用正则表达式去掉html中的标签元素,获得纯文本
    iOS 拨打电话的三种方式总结
    iOS中Block的基础用法
    如何避免在Block里用self造成循环引用
    对MAC自带的SVN进行升级
    IOS开发之记录用户登陆状态
    Xcode7 添加PCH文件
    mysql upgrade
    Ubuntu下更改用户名和主机名
    mysql 查询的时候没有区分大小写的解决方案
  • 原文地址:https://www.cnblogs.com/mxmbk/p/9411288.html
Copyright © 2011-2022 走看看