zoukankan      html  css  js  c++  java
  • rabbitmq可靠性

    本文翻译汇总自rabbitmq的官方文档。

    翻译使用谷歌翻译后简单修改,部分内容读起来仍然比较晦涩,不过意思传达到了。

    可靠性指南

     

    本页介绍了如何使用AMQP和RabbitMQ的各种功能来实现可靠的传送 - 确保消息始终被传递,甚至在系统的任何部分遇到故障。

     

    什么可以失败?

     

    网络问题可能是最常见的失败类。网络不仅可能出现故障,防火墙可以中断空闲连接,并且不会立即检测到网络故障。

     

    除了连接故障之外,broker和客户端应用程序可能会随时遇到硬件故障(或软件崩溃)。此外,即使客户端应用程序持续运行,逻辑错误也可能导致通道或连接错误,迫使客户端建立新的通道或连接,并从问题中恢复。

     

    连接失败

     

    在连接失败的情况下,客户端将需要与broker建立新的连接。以前连接中打开的任何通道都将自动关闭,这些通道也需要重新打开。

     

    一般来说,当连接失败时,客户端将被连接引发异常(或类似的语言结构)通知。官方Java和.NET客户端还提供了回调方法,让您听到其他上下文中的连接失败 - Java在Connection和Channel类上都提供了ShutdownListener回调,.NET客户端提供了IConnection.ConnectionShutdown和IModel.ModelShutdown事件目的。

     

    Acknowledgements和confirm

     

    当连接失败时,消息可能在客户端和服务器之间传输 - 它们可能处于被解析或生成的中间,在OS缓冲区或电线上。传输中的消息将丢失 - 它们将需要重传。Acknowledgements让服务器和客户端知道何时这样做。

     

    Acknowledgements可以在两个方向使用 - 允许消费者向服务器指示它已经接收/处理了消息,并允许服务器向生产者指示相同的东西。 RabbitMQ将后一种情况称为"confirm"。

     

    当然,TCP确保已经接收到数据包,并且将重新发送,直到它们 - 但这只是网络层。Acknowledgements和confirm表明已收到消息并采取行动。confirm信号表示接收到消息,并且转让所有权,接收方承担全部责任。

     

    Acknowledgements因此具有语义 - 消费的应用程序不应该confirm消息,直到它完成了与它们需要的任何操作 - 将它们记录在数据库中,转发它们,将它们打印到纸张或其他任何东西上。一旦这样做,broker可以自由地忘记该消息。

     

    同样,broker一旦承担责任,就会confirm消息(见这里是什么意思)。

     

    confirm的使用保证至少一次Delivery。没有confirm,在发布和消费操作期间可能发生消息丢失,并且只有最多的一次Delivery才能得到保证。

     

    用心跳检测死TCP连接

     

    在某些类型的网络故障中,数据包丢失可能意味着中断的TCP连接需要较长时间(例如,在Linux上使用默认配置约11分钟)才能被操作系统检测到。 AMQP 0-9-1提供心跳功能,以确保应用程序层及时发现连接中断(以及完全无响应的对等体)。心跳也可以防止可能终止"空闲"TCP连接的某些网络设备。有关详细信息,请参阅心跳。

     

    在broker

     

    为了避免在broker中丢失消息,我们需要应对broker重新启动,broker硬件故障,甚至是甚至broker崩溃。

     

    为了确保重新启动时消息和broker定义生效,我们需要确保它们在磁盘上。 AMQP标准具有交换,队列和持久消息的耐久性概念,要求持久对象或持久消息将在重新启动后生存。有关持久性和持久性的具体标志的更多详细信息,请参见"AMQP概念指南"。

    群集和高可用性

     

    如果我们需要确保我们的broker幸存硬件故障,我们可以使用RabbitMQ的集群。在RabbitMQ集群中,所有定义(交换,绑定,用户等)都跨整个集群镜像。队列的行为方式不同,默认情况下只驻留在单个节点上,但可以跨多个或所有节点进行镜像。队列保持可见,并且可以从所有节点访问,无论它们位于何处。

     

    镜像队列在所有已配置的集群节点之间复制其内容,可以无缝地容忍节点故障,并且不会丢失消息(尽管请参阅非同步从站上的此注释)。然而,消费应用程序需要注意,当队列失败时,消费者将被取消,他们将需要重新考虑 - 有关详细信息,请参阅文档。

     

    在Producer

     

    当使用confirms时,从通道恢复的生产者或连接故障应重发任何尚未从broker收到confirm的消息。这里存在消息重复的可能性,因为broker可能发送了一个从未到达生产者的confirm(由于网络故障等)。因此,消费者应用程序将需要以幂等(重复执行的效果一致)方式执行重复数据删除或处理传入的消息。

     

    确保消息路由

     

    在某些情况下,生产者可能很重要的是确保他们的消息被路由到队列(尽管并不总是 - 在公共子系统生产者只会发布的情况下,如果没有消费者感兴趣,那么消息是正确的丢弃)。

     

    为了确保消息被路由到一个已知的队列,生产者只能声明一个目标队列并直接发布给它。如果消息可能以更复杂的方式进行路由,但是生产者仍然需要知道他们是否到达了至少一个队列,则可以在basic.publish上设置mandatory标志,确保basic.return(包含回复码和一些文本解释)将被发送回客户端,如果没有队列被适当地绑定。

     

    在消费者

     

    在网络故障(或节点崩溃)的情况下,可能消息重复,消费者必须准备好处理它们。如果可能,最简单的方法是确保您的消费者以幂等方式处理消息,而不是明确处理重复数据消除。

     

    不能处理的消息

     

    如果消费者确定它不能处理消息,那么它可以使用basic.reject(或basic.nack)拒绝它,要求服务器重新启动它(在这种情况下,服务器可能被配置为死信)代替。

     

     

    消费者Acknowledgements和Producerconfirm

     

    介绍

     

    使用消息传递broker(如RabbitMQ)的系统按照定义分布。由于发送的协议方法(消息)不能保证到达对等体或被其成功处理,所以发布者和消费者都需要一种用于传送和处理confirm的机制。 RabbitMQ支持的几种消息协议提供了这样的功能。

     

    (消费者)DeliveryAcknowledgements

     

    当RabbitMQ向消费者发送消息时,需要知道何时成功发送消息。什么样的逻辑优化取决于系统。因此,它主要是应用程序的决定。

     

    在我们继续讨论其他主题之前,重要的是要解释Delivery是如何被识别的(而且confirm表明他们各自的Delivery)。当消费者(订阅)注册时,消息将由RabbitMQ使用basic.deliver方法传递(推送)。该方法携带Deliverytags,其唯一地标识信道上的传递。

     

    Deliverytags是单调增长的正整数,并由客户端库呈现。承认Delivery的客户端库方法将Deliverytags作为参数。

     

    频道预取设置(QoS)

     

    由于消息以异步方式发送(推送)到客户端,因此通常在任何给定时刻通常会有多个消息"在飞行中"。此外,客户端的手动confirm本质上也是异步的。所以有一个未被confirm的Deliverytags的滑动窗口。开发人员通常会倾向于限制此窗口的大小,以避免消费者端端的无限缓冲区问题。这是通过使用basic.qos方法设置"预取计数"值来完成的。该值定义了通道上允许的未confirmDelivery的最大数量。一旦数量达到配置的计数,RabbitMQ将停止在通道上传递更多消息,除非至少有一个未confirm的消息被confirm。

     

    例如,鉴于在通道Ch上未confirm的Deliverytags5,6,7和8设置为4,RabbitMQ不会再推送任何更多的Delivery,除非至少有一个未完成的Delivery被confirm。当通过delivery_tag设置为8的confirm帧到达该通道时,RabbitMQ将会注意并传递一条消息。

     

    值得重申的是,Delivery流程和手动客户端confirm完全是异步的。因此,如果在飞行中已经有Delivery时改变了预取值,则出现自然竞争条件,并且可能暂时超过在通道上预取计数未confirm的消息。

     

    可以为通道或消费者配置QoS设置。有关详细信息,请参阅消费者预取。

     

    即使在手动confirm模式下,QoS设置也不会影响使用basic.get("pull API")获取的消息。

     

     

    Producer confirm

     

    使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务 - 使信道事务发布,发布消息,提交。在这种情况下,交易是不必要的重量级,并将吞吐量降低250倍。为了弥补这一点,引入了confirm机制。它模仿了协议中已经存在的消费者confirm机制。

     

    要启用confirm,客户端发送confirm.select方法。根据是否设置不等待,broker可以通过confirm.select-ok进行回复。一旦在通道上使用了confirm.select方法,就被认为处于confirm模式。事务通道不能进入confirm模式,一旦通道处于confirm模式,则不能进行事务处理。

     

    一旦一个通道处于confirm模式,broker和客户端都会计数消息(从第一个confirm.select开始计数)。然后,broker通过在同一个频道上发送basic.ack来confirm消息。发送tags字段包含已confirm消息的序列号。broker还可以在basic.ack中设置多个字段,以指示所有到达并包含具有序列号的消息的消息已被处理。

     

    下面是Java中以confirm模式向通道发布大量消息并等待confirm的示例。

    // Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.

    //

    // This software, the RabbitMQ Java client library, is triple-licensed under the

    // Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2

    // ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see

    // LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,

    // please see LICENSE-APACHE2.

    //

    // This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,

    // either express or implied. See the LICENSE file for specific language governing

    // rights and limitations of this software.

    //

    // If you have any questions regarding licensing, please contact us at

    // info@rabbitmq.com.

     

     

    package com.rabbitmq.examples;

     

    import java.io.IOException;

     

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.ConnectionFactory;

    import com.rabbitmq.client.MessageProperties;

    import com.rabbitmq.client.QueueingConsumer;

     

    public class ConfirmDontLoseMessages {

    static int msgCount = 10000;

    final static String QUEUE_NAME = "confirm-test";

    static ConnectionFactory connectionFactory;

     

    public static void main(String[] args)

    throws IOException, InterruptedException

    {

    if (args.length > 0) {

    msgCount = Integer.parseInt(args[0]);

    }

     

    connectionFactory = new ConnectionFactory();

     

    // Consume msgCount messages.

    (new Thread(new Consumer())).start();

    // Publish msgCount messages and wait for confirms.

    (new Thread(new Publisher())).start();

    }

     

    @SuppressWarnings("ThrowablePrintedToSystemOut")

    static class Publisher implements Runnable {

    public void run() {

    try {

    long startTime = System.currentTimeMillis();

     

    // Setup

    Connection conn = connectionFactory.newConnection();

    Channel ch = conn.createChannel();

    ch.queueDeclare(QUEUE_NAME, true, false, false, null);

    ch.confirmSelect();

     

    // Publish

    for (long i = 0; i < msgCount; ++i) {

    ch.basicPublish("", QUEUE_NAME,

    MessageProperties.PERSISTENT_BASIC,

    "nop".getBytes());

    }

     

    ch.waitForConfirmsOrDie();

     

    // Cleanup

    ch.queueDelete(QUEUE_NAME);

    ch.close();

    conn.close();

     

    long endTime = System.currentTimeMillis();

    System.out.printf("Test took %.3fs ",

    (float)(endTime - startTime)/1000);

    } catch (Throwable e) {

    System.out.println("foobar :(");

    System.out.print(e);

    }

    }

    }

     

    static class Consumer implements Runnable {

    public void run() {

    try {

    // Setup

    Connection conn = connectionFactory.newConnection();

    Channel ch = conn.createChannel();

    ch.queueDeclare(QUEUE_NAME, true, false, false, null);

     

    // Consume

    QueueingConsumer qc = new QueueingConsumer(ch);

    ch.basicConsume(QUEUE_NAME, true, qc);

    for (int i = 0; i < msgCount; ++i) {

    qc.nextDelivery();

    }

     

    // Cleanup

    ch.close();

    conn.close();

    } catch (Throwable e) {

    System.out.println("Whoosh!");

    System.out.print(e);

    }

    }

    }

    }

    否定confirm

     

    在特殊情况下,当broker无法成功处理消息时,代替basic.ack,broker将发送一个basic.nack。在这种情况下,basic.nack的字段具有与basic.ack中相应的含义相同的含义,并且请求字段应该被忽略。broker表示无法处理消息,拒绝对其发送一则或多封消息;在这一点上,客户端可能会选择重新发布消息。

     

    通道置于confirm模式后,所有后续发布的消息将被confirm或不存在一次。不能保证消息被confirm多久。没有任何消息将被confirm和否定。

     

    如果在负责队列的Erlang进程中发生内部错误,则只会传递basic.nack。

     

    当消息被重新排队时,如果可能,它将被置于其队列中的原始位置。如果没有(由于多个消费者共享队列时由于其他消费者的并发Delivery和confirm),该消息将被重新排列到更接近队列头的位置。

     

    什么时候confirm message?

     

    对于不可路由的消息,一旦交换验证消息将不会路由到任何队列(返回空列表的队列),broker将发出confirm。如果消息也被发布为强制性,则basic.return将在basic.ack之前发送给客户端。否定的confirm也是如此(basic.nack)。

     

    对于可路由消息,当所有队列接受消息时,发送basic.ack。对于路由到持久队列的持久消息,这意味着持续到磁盘。对于镜像队列,这意味着所有镜像都已接受该消息。

     

    持久化消息的Ack延迟

     

    在将消息持续存储到磁盘后,将发送一个持久消息的basic.ack路由到持久化队列。 RabbitMQ消息存储在间隔(几百毫秒)之后分批地将消息存储到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。这意味着在一个恒定的负载下,basic.ack的延迟可以达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理confirm(作为流)或发布批次的消息,并等待未完成的confirm。客户端库之间的具体API有所不同。

     

    Producerconfirm的订购注意事项

     

    在大多数情况下,RabbitMQ将按照发布的相同顺序向Producerconfirm消息(这适用于在单个频道上发布的消息)。然而,发布者的confirm是异步发出的,可以confirm一个消息或一组消息。发出confirm的确切时刻取决于消息的传递模式(持久与瞬态)以及消息被路由到的队列的属性(见上文)。也就是说,不同的消息可以被认为是准备好在不同的时间进行confirm。这意味着与其各自的消息相比,confirm可以以不同的顺序到达。应用程序不应该依赖于confirm的顺序。

     

    Producer confirm和保证Delivery

     

    如果在所有消息写入磁盘之前崩溃,broker将丢失持久的消息。在某些情况下,这将导致broker以惊人的方式表现。

     

    例如,考虑这种情况:

     

    客户端向持久队列发布持久消息

    客户端从队列中消耗消息(指出消息是持久的,队列持久的),但是还没有确定,

    broker死亡并重新启动,

    客户端重新连接并开始消费消息。

    在这一点上,客户端可以合理地假设该消息将被再次发送。不是这样:重新启动导致broker丢失该消息。为了保证持久性,客户应该使用confirm。如果Producer的频道处于confirm模式,Producer将不会收到丢失的消息的confirm(因为该消息尚未写入磁盘)。

    限制

     

    最大Deliverytags

     

    Deliverytags是一个64位长的值,因此其最大值为9223372036854775807.由于每个渠道的Deliverytags是范围限定的,所以Producer或消费者在实践中不太可能超过此值。

     

     

    个人理解:

    上面的描述非常复杂,我总结来说,有一下几种情况需要在开发中注意:

    • Producter发送消息之后,没有收到Broker的confirm:

    消息可能终止在了传送的层面,如操作系统缓冲层,或者网络传输层,或者是在Broker接受之后,由于内部故障不能处理,如exchang故障,也不会发送confirm给Producter。所以,在我们的系统中,我们在producter端其实是有数据库表存储需要发送的消息的,我们一次批量发送100条消息,一旦收到confirm,就会删除这部分消息,所以没有接收到confirm的话,就不删除相应的数据。

    还要保证消息的幂等。如此就可以保证在producter层面不会丢失消息。

    • broker接收到消息之后,在exchang或者queue中丢失:

    设置消息和exchang和queue都为持久的。

    • 找不到消息对应的queue

    我们的程序不会出现这种情况。

    • queue没有对应的consumer

    这种情况下,消息会在queue中挤压,也不会丢失。

    • 消息可能会重复发送,所以需要保证消息处理的幂等性。

     

     

    broker将在下面的情况中对消息进行confirm:

    • broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return) 
    • 非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
    • 持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync) 
    • 持久消息从其所在的所有queue中被consume了(如果必要则会被ack)

     

    批量发送消息,并批量接收确认的例子:

    // 发送持久化消息,消息内容为helloWorld for (long i = 0; i < msgCount; ++i)

    {

    ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "helloWorld".getBytes());

    }

    // 等待所有消息都被ack或者nack,如果某个消息被nack,则抛出IOException

    ch.waitForConfirmsOrDie();

    网上有人做的测试,使用这种批量确认的模式,和使用异步的方式,性能差的不是太多。但是如果使用单条确认,性能将差别数倍。

     

     

  • 相关阅读:
    方法引用(method reference)
    函数式接口
    Lambda 表达式
    LinkedList 源码分析
    ArrayList 源码分析
    Junit 学习笔记
    Idea 使用 Junit4 进行单元测试
    Java 定时器
    【干货】Mysql的"事件探查器"-之Mysql-Proxy代理实战一(安装部署与实战sql拦截与性能监控)
    python-flask框架web服务接口开发实例
  • 原文地址:https://www.cnblogs.com/xiaolang8762400/p/7471211.html
Copyright © 2011-2022 走看看