zoukankan      html  css  js  c++  java
  • 使用RabbitMQ

    开发中使用RabbitMQ的注意事项

    https://blog.csdn.net/qq_28334711/article/details/60963639

    使用消息队列处理消息的时候,我们可能会遇到以下问题:
    消息处理失败
    消息体本身有误
    消息重复处理
    消息丢失
    对于消息处理失败,有可能有由于网络波动导致的数据处理异常,待网络稳定时消息就会正常处理,对于这种处理失败,我们应该继续尝试去处理消息。
    消息体本身有误,这会导致消息连续处理失败,占用较多的资源,写大量的无用日志,这种错误应该丢弃这部分无用消息,但要记录下日志,记清消息体本身数据,以及丢弃消息的原因。
    消息重复处理,例如我们通过消息队列向数据库中添加数据,由于数据库网络波动,导致数据库连接超时,而我们的系统认为消息处理失败,就会把消息回滚到消息队列,继续尝试处理,这时就会造成消息重复处理的现象,对于重要的消息,我们可以每处理一条消息,就记录一下,处理新的消息时,进行判断消息是否已经处理,如果已经处理,就丢弃消息。
    由于Spring 与RabbitMq集成 对消息的处理方式是默认自动应答,也就是处理消息时无论是否出现异常,都会给消息队列应答处理成功,消息队列删除消息,这时就会出现消息丢失的情况,为了解决这个问题,我们需要使用手动应答的方式处理消息。
    rabbitMQ消费者监听器的配置
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener queues="queueName" ref="Listiner"/>
    </rabbit:listener-container>
    acknowledge="manual" 就表示该监听器手动应答消息
    消费者监听器的编写
    为了能够手动应答消息,我们编写的监听器需要实现ChannelAwareMessageListener,重写onMessage()方法,里面有两个参数Message message, Channel channel,Message 是消息体本身,Channel是RabbitMQ的连接通道。
    异常的处理
    消息处理正常,没有抛出异常,这时我们需要手动应答消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    当出现异常时,我们需要把这个消息回滚到消息队列,有两种方式
    //ack返回false,并重新回到队列,api里面解释得很清楚
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    //拒绝消息
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    经过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息,进行处理,接着抛出异常,进行回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。对于消息回滚到消息队列,我们希望比较理想的方式时出现异常的消息到达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,因此我们采取的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息到消息队列,这时就实现了错误消息进行消息队列尾部的方案。
    //手动进行应答
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    //重新发送消息到队尾
    channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(new Object()));

    对于第三条中的解决方案会存在一个问题,如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案
    一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。
    另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就 丢弃该消息,但需要记录详细的日志。
    使用手动应答消息,有一点需要特别注意,那就是不能忘记应答消息,因为对于RabbitMQ来说处理消息没有超时,只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响业务执行。

  • 相关阅读:
    一个JAVA题引发的思考
    eclipse好玩的插件集(一) CKEditor插件
    Log4J使用实例---日志进行邮件发送或是存入数据库
    log4j输出到数据库(输出自定义参数、分级保存)
    String和StringBuffer的一点研究
    String、StringBuffer、StringBuilder区分和性能比较
    最新eclipse安装SVN插件
    jsoup select 选择器
    网页导出excel文件
    Dom4j完整教程
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/9851960.html
Copyright © 2011-2022 走看看