zoukankan      html  css  js  c++  java
  • 【rabbitmq】rabbitmq概念解析--消息确认--示例程序

    概述

    本示例程序全部来自rabbitmq官方示例程序,rabbitmq-demo
    官方共有6个demo,针对不同的语言(如 C#,Java,Spring-AMQP等),都有不同的示例程序;
    本示例程序主要是Spring-AMQP的参考示例,如果需要其他语言的参考示例,可以参考官网;

    rabbitmq模拟器

    模拟器

    rabbitmq简介

    核心架构图

    核心架构图
    数据流转图
    架构图

    AMQP 0-9-1 Model Explained

    重要语法说明

    • producer或publisher: 消息生产者/发布者,即:产生消息的;
    • Exchange:producer或publisher只会将message发送到Exchange,目前有4种不同的Exchange类型;
    • Queue:消息队列,所有的消费者都是直接从Queue获取Message并消费;
    • Binging:连接Exchange和Queue的纽带,决定Exchange如何路由消息到不同的Queue;
    • routingKey:生产者-->message-->Exchange,需要指定一个key,叫做routingKey;
    • routingKey:Exchange-->Binging-->Queue,Binging有一个Key值,叫routingKey或bingingKey;
    • bingingKey:Exchange-->Binging-->Queue,Binging有一个Key值,bingingKey;

    核心理解

    4种不同的Exchange,对routingKey的解释都不相同;
    对routingKey的不同解释,决定了Exchange路由Message到Queue的不同方案;

    1. direct exchange: 匹配2个routingKey(即routingKey和bingingKey)是否相等,相等时才进行消息路由;
    2. fanout exchange: 忽略routingKey,会将Message路由到所有绑定的Queue;
    3. topic exchange: routingKey格式形如aaa.bbb.xxx*.ccc.dd.#,类似正则表达式匹配;
    4. headers exchange:

    jar包说明

    • Java版本:
      Java版本使用如下jar(说明:若是使用):
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.0.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    demo1: 单生产者-单消费者

    单生产者-单消费者官方示例

    spring.profiles.active=hello-world, sender, receiver
    

    demo2: 单生产者-多消费者

    Work queues官方示例
    workQueue
    application.properties配置

    spring.profiles.active=work-queues, sender, receiver
    #spring.profiles.active=work-queues, sender
    #spring.profiles.active=work-queues, receiver
    

    详细描述参见:单生产者-多消费者详细


    demo3: 发布/订阅

    Publish/Subscribe官方示例
    发布/订阅

    • 消费广播到多个消费者进行消费;
    • 使用fanout pattern;

    application.properties配置

    spring.profiles.active=pub-sub, receiver , sender 
    

    详细描述参见:发布/订阅详细


    demo4: Routing

    Routing官方示例

    Direct exchange 模式进行route结构图

    direct-exchange
    a message goes to the queues whose binding key exactly matches the routing key of the message;(相等时才路由)

    Multiple bindings

    Multiple bindings
    两个Queue使用相同的BingingKey(black) ==> 效果类似于:发布/订阅模式(demo3);

    完整的结构图

    together

    application.properties配置

    pring.profiles.active=routing, receiver , sender  
    

    详细描述参见:发布/订阅详细


    demo5: Topics

    Topics官方示例
    结构示例图

    • 使用 Topic exchange实现;
    • 发送到Topic exchange的routingKey必须满足一定要求:用"."分割的words列表,如:*.aaa.bbb.#
    • BingingKey和routingKey有相同的格式要求;
    • * : 可以匹配一个word;
    • #: 可以匹配0个或多个words;

    application.properties配置

    pring.profiles.active=topics, receiver , sender 
    

    详细描述参见:Topics


    demo6: RPC over RabbitMQ

    RPC官方示例

    结构图

    架构图

    application.properties配置

    spring.profiles.active=rpc,server
    #spring.profiles.active=rpc,client
    

    详细描述参见:RPC


    消费端确认

    Delivery Identifiers: Delivery Tags

    消费者注册后,rabbitmq将消息交付给消费者时,都会带有一个“Delivery Tags”,这个是唯一的ID标识,id以整数的递增的方式实现。

    Acknowledgement Modes(消费端)

    自动确认模式

    • 发送之后,就认为是发送成功(fire-and-forget)
    • 消息不停的发送到消费端消费,无需等待消费端任何确认;

    缺点:

    • 可能造成消费端不堪重负;

    手动模式

    1. basic.ack: 肯定的确认;
    2. basic.nack: 否定的确认(RabbitMQ对AMQP 0-9-1的扩展),支持消息批量确认;
    3. basic.reject:否定的确认,消息消费失败后,直接从broker中将消息delete不支持批量确认

    Acknowledging Multiple Deliveries at Once(消息批量确认)

    • 一次确认多个消息发送,而不是每一个消息单独确认;
    • basic.reject:不具备该功能;
    • basic.nack: 具备该功能;

    实现方式

    • multiple field: 设置为true;

    示例

    假设:在Channel(ch)上有5,6,7,8这4个delivery tags未确认;

    • 情况1,delivery_tag=8 & multiple=true: 则5,6,7,8这4个tags都将被确认;
    • 情况2,delivery_tag=8 & multiple=false:则只有8被确认,而5,6,7将不会被被确认;

    Channel Prefetch Count (QoS)[可以设置消费端消费的速率]

    • 消息消费是异步完成的,手动确认也是异步的;
    • 有一部分消息是被消费了,但是还未来得及确认:希望控制未被确认消息的size,防止无界的缓存
    • prefetch count:使用basic.qos方法设置该值可以控制未被确认消息的max size;
    • 当达到该最大值时,rabbitmq将停止交付消息进行消费;
    • 仅对basic.qos方法有效,对basic.get方法无效;

    示例

    假设:在Channel(Ch)上有5,6,7,8共4个未被确认的消息,且ch的prefetch count=4
    结果:rabbitmq将不会再交付任何消息到该Channel上,除非有消息被确认;

    消费确认选择,prefetch设置以及吞吐量

    • 情况1:增大prefetch:提高向消费者传递消息的速度;
    • 情况2:自动确认模式可以产生最佳的传送速率;

    应避免:

    1. 自动确认模式
    2. 手动确认模式 + 无限制的prefetch

    结论:

    • 情况1情况2都可能导致交付但未来得及处理的Message增加,增大RAM的消耗;

    推荐值:

    • prefetch: 100~300,可以有效提高吞吐量,并避免RAM消耗过多的风险;

    消费失败或连接中断: 自动重新reQueue

    当消息发送给消费端后,如果出现如下情况,则消息会重新reQueue,会被再次发送;

    1. TCP连接中断;
    2. 消费端挂掉:无法进行消息确认;

    Client Errors: Double Acking and Unknown Tags

    消费端无法对同一个消息确认超过一次,当超过一次之后,将抛出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX

    总结

    • 每个交付给消费端的消息,都有一个唯一的标识delivery tag
    • 自动消息确认;
    • 手动消息确认:每个消息单独确认批量消息确认;
    • prefetchCount:可以控制消息端的吞吐量,避免消费端消费过慢,产生RAM大量消耗;
    • 失败重传:TCP连接中断消费端挂掉,都会引起消息重新入队列,重新消费(手动消息确认时);
    • 无法对同一个消息进行2次或2次以上的确认,否则会抛出异常;

    发送端确认

    Channel事务

    • 不推荐使用: 会严重降低吞吐量;

    在 AMQP 0-9-1中,保证消息不丢失的唯一方法,就是使用事务;

    1. 开启Channel事务;
    2. 发送消息,提交事务;

    类似消费端的应答确认机制

    • confirm.select: 应用于Channel时,表示使用确认模式
    • 事务确认模式无法共存:二者只能选择其一;

    确认模式 (confirm.select)

    • 发送端使用confirm.select;
    • broker发送basic.ack来确认Message已被处理;
    • delivery-tag: 消息序列,具有唯一性;
    • multiple=true: 用于设置批量消息确认
    • 无法保证消息何时被确认;
    • 确认模式:消息要么被confirmed(OK),要么被nack(fail),且only once;

    Java示例:(发送端发送大量messages,使用确认模式)
    程序-确认模式

    否定确认

    异常情况时,服务端无法处理消息,则broker发送basic.nack来进行否定确认

    应答延时和持久化消息

    • 仅当消息被持久化到disk之后,才会发送basic.ack应答;
    • 吞吐量提高建议:异步处理应答批量发送消息;

    应答顺序

    当使用异步发送和持久化消息时,broker对消息的确认顺序可能和发送者的消息发送顺序不一致;

    发送确认 + 保证交付

    • 消息持久化: 并不能保证消息不丢失(在写入disk前broker就挂掉);

    限制

    Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.

    参考

    Consumer Acknowledgements and Publisher Confirms

  • 相关阅读:
    Java 进制转换
    k-近邻算法实例
    Java JTS & 空间数据模型
    Java中 &&与&,||与|的区别
    http https 区别
    四种DCOM错误的区别,0x80080005 0x800706be 0x80010105 0x
    OPC测试常用的OPCClient和OPCServer软件推荐
    关于TFS2010 远程无法创建团队项目的若干问题总结
    我对NHibernate的感受(4):令人欣喜的Interceptor机制
    我对NHibernate的感受(3):有些尴尬的集合支持
  • 原文地址:https://www.cnblogs.com/ssslinppp/p/8065682.html
Copyright © 2011-2022 走看看