zoukankan      html  css  js  c++  java
  • RabbitMQ 入门【精+转】

    rabbitmq可以用一本书取讲,这里只是介绍一些使用过程中,常用到的基本的知识点。
    官方文档覆盖的内容,非常全面:http://www.rabbitmq.com/documentation.html 。

    1. 介绍

    RabbitMQ,即消息队列系统,它是一款开源消息队列中间件,采用Erlang语言开发,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的标准实现。

    AMQP是一个公开发布的异步消息的规范,是提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    https://www.rabbitmq.com/tutorials/amqp-concepts.html

    相对于JMS(Java Message Service)规范来说,JMS使用的是特定语言的APIs,而消息格式可自由定义,而AMQP对消息的格式和传输是有要求的,但实现不会受操作系统、开发语言以及平台等的限制。

    JMS和AMQP还有一个较大的区别:JMS有队列(Queues)和主题(Topics)两种消息传递模型,发送到 JMS队列 的消息最多只能被一个Client消费,发送到 JMS主题 的消息可能会被多个Clients消费;AMQP只有队列(Queues),队列的消息只能被单个接受者消费,发送者并不直接把消息发送到队列中,而是发送到Exchange中,该Exchage会与一个或多个队列绑定,能够实现与JMS队列和主题同样的功能。

    另外还有一种 MQTT协议,意为消息队列遥测传输,是IBM开发的一个即时通讯协议。由于其维护一个长连接以轻量级低消耗著称,所以常用于移动端消息推送服务开发。MQTT是基于TCP的应用层协议封装,实现了异步Pub/Sub,在物联网(IoT)应用广泛。

    RabbitMQ可通过库、插件的形式,支持JMS和MQTT协议。参考:http://geek.csdn.net/news/detail/71894

    1.1 主要概念

    1. Broker
      接收和分发消息的应用,RabbitMQ Server就是Message Broker

    2. Exchange
      message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct, topic, fanout。
      如果没有队列绑定到exchange上,那么该exchange上的消息都会被丢弃,因为它不存储消息又不知道该怎么处理消息。

    3. Queue
      消息队列载体,每个消息都会被投入到一个或多个队列

    4. Binding
      在exchange和queue之间建立关系就叫Binding,消费者声明队列的时候一般会指定routing_key,也可以叫binding_key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

    5. Routing Key
      这里区分一下binding和routing: binding是一个将exchange和queue关联起来的动作,routing_key可以理解成队列的一个属性,表示这个队列接受符合该routing_key的消息,routing_key需要在发送消息的时候指定。

    6. Vhost
      于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

    7. Producer
      消息生产者,就是投递消息的程序。只负责把消息发送exchange,附带一些消息属性。

    8. Consumer
      消息消费者,就是接受消息的程序。

    9. Channel
      如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。
      Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

    1.2 对比

    rabbitmq
    activemq
    rocketmq
    kafka
    zeromq
    redis

    celery
    待续

    2. 安装配置

    CentOS 6.7,安装3.6.14最新稳定版本:

    1
    2
    3
    4
    wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
    rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
    yum install -y socat

    如果机器上有epel源,先把它禁用掉:enabled=0,否则会默认从这个源按照低版本rabbitmq 。
    如果已安装老版本,可能需要卸载 rpm -qa|grep erlang|awk '{print "yum remove -y "$1}'|sh 。
    继续

    1
    2
    3
    4
    wget http://packages.erlang-solutions.com/rpm/centos/6/x86_64/erlang-20.1-1.el6.x86_64.rpm
    wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.14/rabbitmq-server-3.6.14-1.el6.noarch.rpm

    yum localinstall -y erlang-20.1-1.el6.x86_64.rpm rabbitmq-server-3.6.14-1.el6.noarch.rpm

    确保本地主机名能够正常解析出自己的ip,或 127.0.0.1. (ping rabbitmq-01)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ulimit -S -n 4096
    ulimit -n 65534

    # limits.conf
    cat /etc/security/limits.conf
    * soft nofile 65535
    * hard nofile 65535

    # 从配置文件模板创建配置文件
    sudo cp -a /usr/share/doc/rabbitmq-server-3.6.14/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

    # 启动
    /etc/init.d/rabbitmq-server restart

    默认用户名密码 guest/guest, 具有vhost / 的所有权限,只能在本地访问。
    队列元数据及内容信息,默认在目录 /var/lib/rabbitmq/mnesia 下。

    2.1 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # 启用管理插件
    rabbitmq-plugins enable rabbitmq_management

    # /etc/rabbitmq/rabbitmq.config 配置
    [
    {rabbit,
    [%%
    {tcp_listeners, [5672]},
    {vm_memory_high_watermark, 0.6},
    %% {vm_memory_high_watermark_paging_ratio, 0.5},
    {hipe_compile, true}
    ]},
    {rabbitmq_management,
    [%% Preload schema definitions from a previously exported definitions file. See
    ]}
    ].

    %%是Erlang的注释符号。

    • vm_memory_high_watermark
      RabbitMQ在使用当前机器的40%以上内存时候,会发出内存警告,并阻止RabbitMQ所有连接(producer连接)。这个阈值便由 vm_memory_high_watermark 控制
    • vm_memory_high_watermark_paging_ratio
      当内存中的数据达到一定数量后,他需要被page out出来。比如默认这个ratio=0.5,机器内存8G,于是 memory watermark=0.4 8G几即 3.2G。3.2G paging_raio = 1.6G,当消息挤压的量达到1.6G后,开始paging到磁盘上。
      一搬不去改它。
    • hipe_compile
      开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。
      开启之后,每次启动 rabbitmq-server,需要多花1分钟左右。

    看下 rabbitmqctl status 信息,混个眼熟:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62

    Status of node 'rabbit@rabbitmq-01'
    [{pid,6232},
    {running_applications,
    [{rabbitmq_management,"RabbitMQ Management Console","3.6.14"},
    {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.14"},
    {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.14"},
    {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
    {rabbitmq_consistent_hash_exchange,"Consistent Hash Exchange Type",
    "3.6.14"},
    {rabbitmq_sharding,"RabbitMQ Sharding Plugin","3.6.14"},
    {rabbit,"RabbitMQ","3.6.14"},
    {amqp_client,"RabbitMQ AMQP Client","3.6.14"},
    {rabbit_common,
    "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
    "3.6.14"},
    {os_mon,"CPO CXC 138 46","2.4.3"},
    {mnesia,"MNESIA CXC 138 12","4.15.1"},
    {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
    {compiler,"ERTS CXC 138 10","7.1.2"},
    {recon,"Diagnostic tools for production use","2.3.2"},
    {syntax_tools,"Syntax tools","2.1.3"},
    {crypto,"CRYPTO","4.1"},
    {stdlib,"ERTS CXC 138 10","3.4.2"},
    {kernel,"ERTS CXC 138 10","5.4"}]},
    {os,{unix,linux}},
    {erlang_version,
    "Erlang/OTP 20 [erts-9.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true] "},
    {memory,
    [{connection_readers,0},
    {connection_writers,0},
    {connection_channels,0},
    {connection_other,8864},
    {queue_procs,48686248},
    {queue_slave_procs,0},
    {plugins,14194848},
    {other_proc,12618480},
    {metrics,323944},
    {mgmt_db,12627800},
    {mnesia,701856},
    {binary,22261264},
    {msg_index,634656},
    {allocated_unused,364165712},
    {reserved_unallocated,0},
    {total,596238336}]},
    {alarms,[]},
    {listeners,
    [{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"0.0.0.0"}]},
    {vm_memory_calculation_strategy,rss},
    {vm_memory_high_watermark,0.6},
    {vm_memory_limit,4952820940},
    {disk_free_limit,50000000},
    {disk_free,1626125135872},
    {file_descriptors,
    [{total_limit,65435},
    {total_used,58},
    {sockets_limit,58889},
    {sockets_used,0}]},
    {processes,[{limit,1048576},{used,446}]},
    {run_queue,0},
    {uptime,1232025},
    {kernel,{net_ticktime,60}}]

    2.2 命令行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 添加新的 vhost
    rabbitmqctl add_vhost /some0
    rabbitmqctl list_vhost

    # 添加登录用户 admin
    rabbitmqctl add_user admin admin
    rabbitmqctl list_users

    # 设置为管理员角色
    rabbitmqctl set_user_tags admin administrator

    # 设置权限
    rabbitmqctl set_permissions -p /some0 admin '.*' '.*' '.*'
    rabbitmqctl list_permissions -p /some0
    rabbitmqctl list_user_permissions admin

    在开始介绍概念之前,先可以从Web UI上来认识一下rabbitmq:
    rabbitmq overview 首页监控面板:
    rabbitmq-overview.png

    rabbitmq 客户端的连接信息:
    rabbitmq-connections.png

    某个channel的详情:
    rabbitmq-channel-info.png

    exchanges信息:
    rabbitmq-exchanges.png

    queues信息:
    rabbitmq-queues-consuming.png

    策略定义:
    rabbitmq-policy.png

    3. Exchange类型

    AMQP 0-9-1 定义了四种内置类型的exchange type: direct, fanout, topic, header。exchange除了类型以外,还可以指定一些属性:

    • Name: 交换器名字。一般以 . 号分隔以作区分
    • Durability: 持久化的exchange在broker重启之后依然存在。相对应是 transient exchange
    • Auto-delete: 如果设置了该属性,在最后一个队列unbound之后,exchange会自动删除
    • Arguments: 可以用在满足插件扩展上

      • alternate-exchange
        RabbitMQ自己扩展的功能,不是AMQP协议定义的。
        Alternate Exchange属性的作用,创建Exchange指定该 x-arguments 的alternate-exchange属性,发送消息的时候根据route key没有找到可以投递的队列,这就会将此消息路由到 Alternate Exchange 属性指定的 Exchange (就是一个普通的exchange)上了。

        比如把MySQL的binlog订阅出来,因为里面有许多表,每个表的dml行数有多有少。我们可以将变更量多的表单独放到一个队列,其它表一起放到一个队列,就可以为原始的exchange添加 alternate-exchange 属性,将其它表的数据重新投递到另一个exchange。

    3.1 fanout

    fanout类型的exchange是最容易理解的,它会把来自生产者的消息广播到所有绑定的queues上。这种情况一般会把消息的routing_key设置为空'',甚至不关心队列的名字。如下图:

    amq.gen-RQ6...amq.gen-As8...是消费者随机生成了两个队列,绑定到fanout exchange上,C1,C2会各自收到一模一样的消息。

    3.2 direct

    direct类型的exchange转发消息到队列里,是直接基于消息的routing key。

    C1在声明队列的时候,指定routing_key=error。C2的队列上绑定了info,error,warning三个key。
    于是error类型的消息会被同时发送到C1,C2(准确的说是两个队列上),而info,warning类型的消息只发送到队列amqp.gen-Agl...

    如果要达到Round-Robin轮询效果,即两个Consumer依次从同一个队列里取消息,那么可以在声明队列的时候指定相同的 queue name,rabbitmq会自动均衡的发送消息给多个Consumer,可水平扩展消费者的处理能力(如果要保证处理顺序,得设置prefetch_count=1)。

    3.3 topic

    topic类型的exchange大大提升了消息路由的灵活性。不像fanout那样无脑的全部转发,也不像direct那样指定所有的routing_key,否则不匹配的key的消息就会被丢弃。
    比如有一个收集日志的系统,模块包括auth/cron/kernel/app1/app2,日志级别包括error,info,warning。现在要把所有模块的error日志规整在一起,可以设计routing_key: <module>.<severity> (auth.error, auth.info, …, app1.error, app1.info…),然后设置queue的binding_key=’*.error’

    topic exchange 会根据 . 划分word,有以下两种正则符号用于匹配routing_key:

    • *: 代表一个word
    • #: 代表0个或多个word

    拿官网的例图来说:<敏捷度>.<颜色>.<物种>

    上图创建了3个bindings:

    • 队列Q1的binding_key=*.orange.*,即对所有橙色的动物感兴趣
    • 队列Q2绑定了*.*.rabbitlazy.#,即订阅了所有和兔子相关的消息,以及反应迟钝的动物

    于是:

    • routing_key为quick.orange.rabbit的消息,会被发送到两个队列
    • routing_key为lazy.orange.elephant的消息,也会被发送到两个队列
    • routing_key为quick.orange.fox的消息,只会发送到Q1
    • routing_key为lazy.brown.fox的消息,只会发送到Q2
    • routing_key为lazy.pink.rabbit的消息,只会发送到Q2。虽然匹配到了lazy.#*.*.rabbit,但只会发送一次
    • routing_key为quick.brown.fox的消息,会被丢弃,因为没有任何绑定的队列得到匹配
    • routing_key为lazy.orange.male.rabbit的消息,还是会发送到Q2,因为 lazy.#
      然而orangequick.orange.male.rabbit,也破坏了约定,但没得到匹配,消息丢弃。
    • routing_key为#,接受所有消息,相当于fanout exchange
    • routing_key没有*#时,相当于direct exchange

    3.4 headers

    header类型的exchange用的不多,是在routing_key不能满足使用场景的情况下(如routing_key必须是字符串),在消息的头部加入一个或多个key/value,然后在声明队列的时候也指定要绑定的header。

    binding的时候有个参数x-match,指定headers所有的k/v都要匹配成功(all)还是任意一个匹配则接受(any)。

    3.5 x-consistent-hash

    这是个第三方插件形式存在的exchange,目前已内置于rabbitmq:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange

    x-consistent-hash类型的exchange可以根据routing_key,用一致性哈希算法,将消息路由到不同的队列上。它可以尽可能的保证每个队列上的消息数量相同,也可以随时添加更多的队列来“分流”,并且能保证同一个routing_key会进入相同的queue。

    要达到这样的效果,queue routing key必须是一个字符串类型的数字。比如Q1:routing_key=’10’, Q2:routing_key=’20’,那么消息就会按照1:2的比例,发送到Q1,Q2。

    3.6 x-modulus-hash

    第三方插件形成存在的exchange,从3.6.0版本开始,也内置到了rabbitmq发行版:https://github.com/rabbitmq/rabbitmq-sharding

    x-modulus-hash类型的exchange与 x-consistent-hash 很像,也叫 sharding exchange,即将message在多个队列之间进行分区发送。它的实现方法是根据 routing_key 先获得hash,再用 Hash mod N 得到队列,N就是绑定到exchange上的队列个数。

    4. Queue属性

    Queue 要先于 Exchange 创建,否则生产者发布的消息,在没有绑定队列之前,会丢失。
    已存在的Queue可以重复declare,但前提是属性要相同。

    • Name: 队列名称。可以在应用里面指定,或者交给broker生成
    • Durable:持久化的Queue在broker重启之后,依然存在。
      注意,这里的持久化与消息持久无关。是个 property
    • Exclusive: 为True时,表示当Consumer的Connection端口之后,队列自动删除。一般由broker生成的随机队列名,指定这个选项 。
      排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的
    • Auto-delete: 当最后一个consumer取消订阅之后,队列自动删除

    • Arguments: 设置可选的一些参数,如

      • x-message-ttl
        消息在队里里最大存活时间,超过这个ttl就会被丢弃。单位毫秒

      • x-max_length
        队列里最多容纳的消息个数,超过这个值,则会从队列头部drop掉消息

      • x-max-priority
        设置了这个参数,就表示这是一个具有优先级的队列。它的值是可定义的优先级最大值,一般10以内就够了。
        在生产商Publish消息的时候,消息Property上可设置Priority

      • x-queue-mode
        这个参数是控制是否为”延迟队列”,Lazy Queue是在3.6.0引入的,它会尽量把消息存在磁盘上,节省内存
        RabbitMQ一开始的设计初衷,是做异步、解耦,所以会把消息放在内存里面,以便快速的发送给消费者(持久化类型的消息会同时存在于磁盘和内存缓存中)。

        如果用它来暂时存放大量消息,而不消费或者消费太慢,会导致性能明显下降,因为为了释放内存,消息得swap到磁盘上 —— 会阻塞队列接收新消息。如果内存使用达到broker设置的 water-mark,也会拒绝接收新消息。
        Lazy Queue(x-queue-mode=lazy)的作用就是一接收到新消息,马上存到文件系统,完全避免了前面提到的内存占用。这会增加磁盘I/O(顺序的),与处理持久化类型的消息很相似。

      • x-dead-letter-exchange
        死信。当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:

        • 消息被拒绝(basic.reject or basic.nack)并且requeue=false
        • 消息TTL过期
        • 队列达到最大长度

        DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列。
        死信被重新 requeue 时,可以改变它的routing_key,以便新的队列处理,routing_key用x-dead-letter-routing-key指定,如果不指定则继续使用消息原来的routing_key。

    5. Message属性

    • routing_key
      路由关键字,exchange根据这个关键字进行消息投递
    • delivery_mode
      • 1: Non-persistent,消息不持久化到磁盘,尽快被消费掉。重启broker之后消息丢失
      • 2: Persistent,消息持久化。当然被取走的消息,也就不存在了
    • headers
      消息头信息,key/value形式,可以认为给消息打上了各种各样的标签。可用于代替 routing_key 去路由(结合headers来下的exchange),或者第三方插件使用。
    • properties
      实际上 headers 和 delivery_mode 也是properties的一部分,因为使用较多,所以单独拿出去。这里也只提几个:
      • priority
        消息优先级。数字,优先级高的消息会排在队列头部
      • correlation_id 和 reply_to
        这两个一般用于实现服务间RPC调用, 即生产者发起请求到rabbitmq队列,等待处理结果返回,消费者处理完消息后返回结果给调用方。
        reply_to 在消息里面告诉消费者,处理完的结果放到哪个队列,调用方根据 correlation_id 找到结果。详情参考 https://www.rabbitmq.com/tutorials/tutorial-six-python.html
      • expiration
        消息自身的Time-To-Live,用的较少,也叫 Per-Message TTL In Publisher.
        前面提到,队列的arguemnts可以设置 x-message-ttl ,也叫 Per-Queue Message TTL In Queues.消息是否过期以两者的最小值为准,并且消息自身过期时间到了之后,不会自动从队列删除,而是在发送给消费者的时候丢弃。
        队列自身也有个 x-expires,它指的是队列在多久没有消费者连上来,超过这个时间后队列自动删除。
    • payload: 消息正文

    6. 插件

    RabbitMQ支持插件式的来扩展功能。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    列举server上安装的所有插件
    # rabbitmq-plugins list
    Configured: E = explicitly enabled; e = implicitly enabled
    | Status: * = running on rabbit@rabbitmq-01
    |/
    [e*] amqp_client 3.6.14
    [e*] cowboy 1.0.4
    [e*] cowlib 1.0.2
    [ ] rabbitmq_amqp1_0 3.6.14
    [ ] rabbitmq_auth_backend_ldap 3.6.14
    [ ] rabbitmq_auth_mechanism_ssl 3.6.14
    [E*] rabbitmq_consistent_hash_exchange 3.6.14
    [ ] rabbitmq_event_exchange 3.6.14
    [ ] rabbitmq_federation 3.6.14
    [ ] rabbitmq_federation_management 3.6.14
    [ ] rabbitmq_jms_topic_exchange 3.6.14
    [E*] rabbitmq_management 3.6.14
    [e*] rabbitmq_management_agent 3.6.14
    [ ] rabbitmq_management_visualiser 3.6.14
    [ ] rabbitmq_mqtt 3.6.14
    [ ] rabbitmq_random_exchange 3.6.14
    [ ] rabbitmq_recent_history_exchange 3.6.14
    [E*] rabbitmq_sharding 3.6.14
    [ ] rabbitmq_shovel 3.6.14
    [ ] rabbitmq_shovel_management 3.6.14
    [ ] rabbitmq_stomp 3.6.14
    [ ] rabbitmq_top 3.6.14
    [ ] rabbitmq_tracing 3.6.14
    [ ] rabbitmq_trust_store 3.6.14
    [e*] rabbitmq_web_dispatch 3.6.14
    [ ] rabbitmq_web_mqtt 3.6.14
    [ ] rabbitmq_web_mqtt_examples 3.6.14
    [ ] rabbitmq_web_stomp 3.6.14
    [ ] rabbitmq_web_stomp_examples 3.6.14
    [ ] sockjs 0.3.4

    启用插件
    # rabbitmq-plugins enable plugin-name

    下面是几个常用插件:

    1. rabbitmq_management
      管理 rabbitmq server 的插件,提供给予HTTP的API和 WebUI,提供管理exchanges、管理queues、管理users、管理policies,监控,发布/接收消息。功能强大,基本是必定开启的插件。
      开启管理插件后,也可以选择不使用Web界面,从 http://localhost:15672/cli/rabbitmqadmin 下载 rabbitmqadmin 命令行工具,它用在一些脚本里面会很方便。(提示: rabbitmqctl 是不能创建exchange和queue,但rabbitmqadmin可以)

    2. rabbitmq_federation
      与MySQL Federated 存储引擎很相似,可以认为 federated exchange 是其它exchange(也叫upstream exchange)的“软连接”、“流量复制”。消息是被publish到上游exchange,然后消费者是从其它broker上的federated exchange订阅消息。
      Federated exchanges/queues 是通过 AMQP 协议的Erlang客户端从真实broker里面取数据(不会消费源数据),可以实现跨网络的消息提取,或者将不同地方的消息汇总到一处。应用场景有 broker / cluster 数据迁移,模仿真实数据的线下测试。

    3. rabbitmq_shovel
      shovel插件就是一个 消费者 + 生产者:从一个queue消费内容,发送到另一个exchange上,甚至可以对消息做些转换。你可以自己实现将消息从源broker消费,重新publish到另一个exchange,但shovel帮我们做好了。

    4. rabbitmq_mqtt
      实现了 MQTT 3.1 协议的adapter,如文章开头所述。

    5. rabbitmq_consistent_hash_exchange
      一致性hash exchange,如前文所述。

    6. 策略 Policy

    首先为什么rabbitmq会有策略这个东西。

    前面我们讲到,queue和exchange有一些固定属性,如durableexclusiveauto-delete等,还有一些可选参数,也叫x-arguments,如x-max-lengthx-queue-mode。这些都是客户端在定义队列和交换器时指定的。

    如果事后想修改 TTL 或者 queue length limit ,那么得修改应用、重新部署,甚至涉及到删除队列,重新declare。Policy就是解决这个痛点的,在服务端对匹配的 exchanges 或者 queues 设置参数,无需动应用。更多请参考 https://www.rabbitmq.com/parameters.html

    一个 policy 包含以下内容:

    • name: 策略名字
    • pattern: 对哪些queues(exchanges)的应用策略,正则表达式
    • definition: 策略内容定义,key/value形式(也可以认为是JSON格式)
    • apply-to: 策略应用在什么身上,queuesexchangesall。默认是all
    • priority: 策略优先级,默认0

    每个exchange/queue只能“注入”一个policy,所以如果要设置多个策略,把key/value组合成json,定义在一起。设置完成会马上生效,包括后面新创建的exchange、queues。

    1
    2
    3
    4
    5
    6
    7
    8
    将exchange设置为 alternate exchange:(策略名:AE)
    rabbitmqctl set_policy -p /some0 AE "^maxwell.some3$" '{"alternate-exchange":"maxwell.AE"}' --apply-to exchanges

    将vhost /some0 的所有队列都设置成 Lazy Queue
    rabbitmqctl set_policy -p /some0 Lazy "^" '{"queue-mode":"lazy"}' --apply-to queues

    队列名匹配 'two-messages' 的队列,设置最大队列消息数为2,超过之后的行为是 禁止接收新消息(与之对应的是 drop-head: 删除头部老的消息)
    rabbitmqctl set_policy my-pol "^two-messages$" '{"max-length":2,"overflow":"reject-publish"}' --apply-to queues

    7. 消息可靠性

    有的系统要保证消息不允许丢失,甚至不允许重复,有的系统追求的是高性能,所以要在性能和可靠性之间权衡。rabbitmq在多个层面提供消息可靠性保证。

    7.1 持久化

    声明持久化的exchange: channel.exchange_delcare(exchange_name, durable)
    声明持久化的队列:channel.queueDeclare(queue_name, durable, exclusive, auto_delete, arguments)
    发布的持久化消息,投递模式为2: delivery_mode=2

    http://www.rabbitmq.com/reliability.html
    persistent

    7.1 ack & confirm

    持久化保证了在broker或者机器出现异常的时候,消息不会丢失,要保证发送者在pub消息、接收sub消息时出现网络异常,客户端也应该有相应的处理。

    Consumer Delivery Acknowledgements

    rabbitmq对Consumer处理消息提供 acknowledgements 确认机制,客户端通过basic.consume注册到broker(push),或者通过basic.get pull 消息,都可以在指定是否开启ack

    delivery tags是实现 ack 的关键,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag,它是单调递增的正整数,在一个channel中唯一代表了一次投递。

    确认模式包括自动确认和手动确认。
    自动确认就是rabbitmq一旦把消息发送出去后,就认为成功,完成确认。此模式性能最高,只要消费者能处理的过来,但自然降低了消息到达处理的可靠性,比如一个消息还在路上,消费者的TCP连接或者channel就关闭了,那么消息也就丢失。如果消费者处理不过来,可能会导致消息在客户端挤压,内存过载,引发异常。所以自动确认一般用在消息比较平稳、客户端能处理的来的系统。

    手动确认,就是客户端需要自己发送确认命令,包括:

    • basic.ack —— 确认成功,客户端成功处理
    • basic.nack —— 确认失败,客户端处理失败,但依然删掉消息
    • basic.reject —— 确认失败,客户端处理失败,消息不删除,可重新发送。

    手动确认模式,可以控制消息处理的速度(流控QoS),通过 prefetch 设置该channel上最大没有确认的消息数,server会等待有空闲的配额时才继续发送给消费者。
    手动确认模式如果不设置 prefetch_count,那么消费者可能会接收许多的消息但未ack,从而导致内存耗尽,所以这点需要小心。正常来说,100-300是个比较可控的范围。(当然如果是 pull 模式,就不存在QoS一说)

    basic.ackbasic.nack可以设置 multiple 字段,批量确认来减少网络传输。比如说在信道 ch 上有 delivery tags 5, 6, 7, 8 没有确认,当客户端发回的确认帧是8并且 multiple=true,那么5-8的tags都被ack。

    在启用手动确认时,发生网络连接断开或者消费者崩溃,而无法返回 ack/nack 命令时,(检测方法是 heartbead)rabbitmq会自动将没有确认的消息 requeue,所以客户端处理消息时,最好能满足幂等性,即能够重复处理这些消息。

    Publisher Confirms

    rabbitmq对Producer发布消息提供 confirm 机制:客户端可以发送一个 confirm.select 命令将channel设置成confirm工作模式。
    所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(basic.ack),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

    如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息,确保消息不会再发送之前就丢失。

    然后对于需要持久化的消息的确认,不能完全保证数据被刷到磁盘上,因为每个消息调用 fsync 的带来的IO代价太高,rabbitmq会每隔几百毫秒,批量将消息从文件系统缓存 fsync 刷到磁盘。(了解MySQL的话对这个应该不陌生)

    7.2 事务

    RabbitMQ 实现了AMQP 0-9-1协议里的事务,这样说唯一能确保消息不丢失的方式,信道可以设置成 transaction 模式:发布消息,commit/rollback消息。

    但是事务在这里太重了,而且会极大的降低性能。不用。

    7.3 rabbitmq分布式

    待聊

    5. python使用示例

    https://pika.readthedocs.io/en/0.10.0/intro.html

    下面的示例是使用Maxwell或者MySQL binlog增量流,json数据进入rabbitmq,然后通过 pika —— python版本的rabbitmq client,重新组装成sql,达到数据增量同步的效果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    def binlog_sync(self):
    logger.info("connect to rabbitmq server [%s], vhost=%s", rabbitmq_conn_info.get('host'), rabbitmq_conn_info.get('vhost', '/'))
    ## rabbitmq 用户认证信息
    credentials = pika.PlainCredentials(rabbitmq_conn_info.get('user', 'guest'),
    rabbitmq_conn_info.get('password', 'guest')
    )
    ## rabbitmq tcp连接
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host=rabbitmq_conn_info.get('host'),
    port=rabbitmq_conn_info.get('port', 5672),
    virtual_host=rabbitmq_conn_info.get('vhost', '/'),
    credentials=credentials
    )
    )
    ## rabbitmq 信道,避免频繁tcp断连
    channel = connection.channel()

    # exchange_name = 'maxwell.some' + str(self.corpmod)
    # exchange_other = 'maxwell.AE'
    logger.info("declare mq exchange [%s], type=[%s]", self.exchange_name, self.exchange_type)
    ## 创建 exchange,如果已经存在相同名字,就不会重复创建,但要求属性要相同
    ## 指定exchange_type,durable, arguments 。这里的alternate-exchange放到策略里从创建,因为目前maxwell作为消费者,没有支持arguemnts参数
    channel.exchange_declare(exchange=self.exchange_name,
    exchange_type=self.exchange_type,
    durable=True,
    # arguments={'alternate-exchange': exchange_other}
    )

    """
    channel.exchange_declare(exchange=exchange_other, exchange_type='topic', durable=True) # alternative exchange
    channel.queue_declare(queue='ae_other', durable=True)
    channel.queue_bind(exchange=exchange_other,
    queue='ae_other',
    routing_key='d_ec_some.*')
    """
    logger.info("declare queue name=[%s]", self.queue_name)
    ## 创建 queue,如果以经存在相同名字的队列,则不会创建,但要求属性相同,否则报错
    ## 指定了 lazy queue
    channel.queue_declare(queue=self.queue_name, durable=True, arguments={'x-queue-mode': 'lazy'})

    ## 将routing_key 绑定到队列上
    for key in self.queue_bind_key:
    logger.info("bind routing_key [%s] to queue [%s]", key, self.queue_name)
    channel.queue_bind(exchange=self.exchange_name,
    queue=self.queue_name,
    routing_key=key)

    # consume callback, internal
    ## 客户端处理消息
    def callback(ch, method, properties, body):
    # print(" [x] Received %s" % body)
    logger.debug("Received message: %s", body)
    try:
    data_row = json.loads(body.decode('utf-8'))
    self.process_data(data_row)

    if ret == -2: # requeue
    ## 处理异常,如Ctrl+C断开,重新排队
    logger.warning("message data: %s (requeue)", data_row)
    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
    # return
    except ValueError as e:
    logger.error("proces Error: %s(skip)", e)
    logger.error(" received data: %s", body)
    ## 处理异常,但跳过
    ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
    logger.error("proces Error: %s(skip)", e)
    logger.error(" message data: %s", data_row)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    else:
    ## 发送确认成功
    ch.basic_ack(delivery_tag=method.delivery_tag)

    ## 设置最多 50 个未确认
    channel.basic_qos(prefetch_count=50)

    # 开始消费,拿到的消息调用callback处理
    channel.basic_consume(callback, queue=self.queue_name, no_ack=False)

    # print(' [*] Waiting for messages. To exit press CTRL+C')
    logger.info("start comsuming")

    参考


    原文连接地址:http://seanlook.com/2018/01/06/rabbitmq-introduce/

  • 相关阅读:
    CodeForce VKcup A
    CNN卷积神经网络
    神经网络
    我的机器学习之路
    [OPENCV] 第一个程序 识别颜色
    Android 登录界面与首页的设计
    go web的基本原理
    Go语言标准库之http/template
    吞吐量(TPS)、QPS、并发数、响应时间(RT)概念
    数据库恢复技术
  • 原文地址:https://www.cnblogs.com/sunsky303/p/8977138.html
Copyright © 2011-2022 走看看