zoukankan      html  css  js  c++  java
  • RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

    继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题?

    回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情?

    • 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理
    • 消息的持久化处理

    上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储。

    本文先从消费端讲起:

    RabbitMQ Server到消费者消息如何不丢?

    上面一篇文章也提到了,消费者获取到消息之后,没有来得及处理完毕,自己直接宕机了,因为消息者默认采用自动ack,此时RabbitMQ的自动ack机制会通知MQ Server这条消息已经处理好了,此时消息就丢了,并不是预期的。

    那么我们采用手动ack机制来解决这个问题,消费端处理完逻辑之后再通知MQ Server,这样消费者没处理完消息不会发送ack,如果在消费者拿到消息,没来得及处理的情况下自己挂了,此时MQ集群会自动感知到,它就会自觉的重发消息给其他的消费者服务实例。

    根据上面的思路你需要完成下面的两步操作:

    第一:消费者监听设置手动ack

    1. this.channel = channelManager.getListenerChannel(namespace);
    2. this.queue = queue;
    3. this.channel.basicConsume(queue, false, consumerTag, this);
    4. this.disconnectedCallback.setChannel(channel);

    核心代码: this.channel.basicConsume(queue, false, consumerTag, this); 第二个参数设置 false 代表不自动ack

    第二:业务执行完成后手动ack

    1. public static void ack(MessageContext context) {
    2. long deliveryTag = context.getEnvelope().getDeliveryTag();
    3. try {
    4. context.getChannel().basicAck(deliveryTag, false);
    5. } catch (IOException e) {
    6. throw new MqAckException("消息ack出错:连接异常或远端关闭", context, e);
    7. }
    8. }

    核心代码: context.getChannel().basicAck(deliveryTag, false);

    这里封装来,需要业务在执行完自己的业务代码后,调用对象channel 的ack方法通知MQServer,说我这边执行完了,你可以删除了。

    注意这里有个问题: 如果忘记调用这个 context.getChannel().basicAck(deliveryTag, false);

    或者因为代码异常,这个代码没被执行,会怎么样?后面找时间再写一篇文章讲这个问题。

    RabbitMQ Server中存储的消息高可用

    当我们解决了,生产端和消费端的问题后,基本保证消息的不丢问题,但是还有一个是消息的高可用问题,单节点问题,普通节点的问题都会影响消息的临时不可用,这个时候要用上我们的HA 镜像集群模式来保证。

    上一篇文章 解决RabbitMQ消息丢失问题和保证消息可靠性(一) 已经提到过,服务端消息部署的三种模式的区别,今天就专门讲镜像模式的介绍。

    镜像模式至少采用3节点,2个磁盘节点和1个内存节点来保证,架构图:

    设置镜像也有一些策略:

    • 同步至所有的,一般不这么做,性能会受到极大影响
    • 同步最多N个机器
    • 只同步至符合指定名称的nodes

    命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

    1. 为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
    1. rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    2. rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    1. 为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式
    1. rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat"
    2. '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
    1. 为每个以“node.”开头的队列分配指定的节点做镜像
    1. rabbitmqctl set_policy ha-nodes "^nodes."
    2. '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

    但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降

    所以采用镜像模式,要根据具体的业务规则定制话处理,没那么重要的业务,消息丢了也没关系的场景,又要求必须高的性能的时候,镜像也可以不用设置。

    总结

    两篇文章的讲解,分析了消息中间件高可用问题的大概的思路,没有具体的代码详细,如有疑问可以下方留言评论,我会及时回复解答,后面我会逐步完善相关细节,欢迎多多关注。

    后面计划更新文章如下:

    • 什么情况会导致重复消费并怎么解决?
    • 什么样的真实业务场景需要保障顺序性和如何保证消息的顺序性?
    • 如何通过消息队列优雅的解决微服务间接口失败的重试?

    推荐阅读

    解决RabbitMQ消息丢失问题和保证消息可靠性(一)

    IntelliJ IDEA提升效率开发插件必备

    END

    如有收获,请帮忙转发,后续会有更好文章贡献,您的鼓励是作者最大的动力!

    欢迎关注我的公众号:架构师的修炼,获得独家整理的学习资源和日常干货推送。

  • 相关阅读:
    hdu2328 Corporate Identity
    hdu1238 Substrings
    hdu4300 Clairewd’s message
    hdu3336 Count the string
    hdu2597 Simpsons’ Hidden Talents
    poj3080 Blue Jeans
    poj2752 Seek the Name, Seek the Fame
    poj2406 Power Strings
    hust1010 The Minimum Length
    hdu1358 Period
  • 原文地址:https://www.cnblogs.com/flyrock/p/11437512.html
Copyright © 2011-2022 走看看