rabbitmq可以用一本书取讲,这里只是介绍一些使用过程中,常用到的基本的知识点。
官方文档覆盖的内容,非常全面:http://www.rabbitmq.com/documentation.html 。
1. 介绍
RabbitMQ,即消息队列系统,它是一款开源消息队列中间件,采用Erlang语言开发,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的标准实现。
AMQP是一个公开发布的异步消息的规范,是提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
https://www.rabbitmq.com/tutorials/amqp-concepts.html
相对于JMS(Java Message Service)规范来说,JMS使用的是特定语言的APIs,而消息格式可自由定义,而AMQP对消息的格式和传输是有要求的,但实现不会受操作系统、开发语言以及平台等的限制。
JMS和AMQP还有一个较大的区别:JMS有队列(Queues)和主题(Topics)两种消息传递模型,发送到 JMS队列 的消息最多只能被一个Client消费,发送到 JMS主题 的消息可能会被多个Clients消费;AMQP只有队列(Queues),队列的消息只能被单个接受者消费,发送者并不直接把消息发送到队列中,而是发送到Exchange中,该Exchage会与一个或多个队列绑定,能够实现与JMS队列和主题同样的功能。
另外还有一种 MQTT协议,意为消息队列遥测传输,是IBM开发的一个即时通讯协议。由于其维护一个长连接以轻量级低消耗著称,所以常用于移动端消息推送服务开发。MQTT是基于TCP的应用层协议封装,实现了异步Pub/Sub,在物联网(IoT)应用广泛。
RabbitMQ可通过库、插件的形式,支持JMS和MQTT协议。参考:http://geek.csdn.net/news/detail/71894
1.1 主要概念
-
Broker
接收和分发消息的应用,RabbitMQ Server就是Message Broker -
Exchange
message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct, topic, fanout。
如果没有队列绑定到exchange上,那么该exchange上的消息都会被丢弃,因为它不存储消息又不知道该怎么处理消息。 -
Queue
消息队列载体,每个消息都会被投入到一个或多个队列 -
Binding
在exchange和queue之间建立关系就叫Binding,消费者声明队列的时候一般会指定routing_key,也可以叫binding_key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。 -
Routing Key
这里区分一下binding和routing: binding是一个将exchange和queue关联起来的动作,routing_key可以理解成队列的一个属性,表示这个队列接受符合该routing_key的消息,routing_key需要在发送消息的时候指定。 -
Vhost
于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等 -
Producer
消息生产者,就是投递消息的程序。只负责把消息发送exchange,附带一些消息属性。 -
Consumer
消息消费者,就是接受消息的程序。 -
Channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。
Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
1.2 对比
rabbitmq
activemq
rocketmq
kafka
zeromq
redis
celery
待续
2. 安装配置
CentOS 6.7,安装3.6.14最新稳定版本:
1
|
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
|
如果机器上有epel源,先把它禁用掉:enabled=0,否则会默认从这个源按照低版本rabbitmq 。
如果已安装老版本,可能需要卸载 rpm -qa|grep erlang|awk '{print "yum remove -y "$1}'|sh
。
继续
1
|
wget http://packages.erlang-solutions.com/rpm/centos/6/x86_64/erlang-20.1-1.el6.x86_64.rpm
|
确保本地主机名能够正常解析出自己的ip,或 127.0.0.1. (ping rabbitmq-01)
1
|
ulimit -S -n 4096
|
默认用户名密码 guest
/guest
, 具有vhost /
的所有权限,只能在本地访问。
队列元数据及内容信息,默认在目录 /var/lib/rabbitmq/mnesia
下。
2.1 配置
1
|
# 启用管理插件
|
%%
是Erlang的注释符号。
- vm_memory_high_watermark
RabbitMQ在使用当前机器的40%以上内存时候,会发出内存警告,并阻止RabbitMQ所有连接(producer连接)。这个阈值便由vm_memory_high_watermark
控制 - vm_memory_high_watermark_paging_ratio
当内存中的数据达到一定数量后,他需要被page out出来。比如默认这个ratio=0.5,机器内存8G,于是 memory watermark=0.4 8G几即 3.2G。3.2G paging_raio = 1.6G,当消息挤压的量达到1.6G后,开始paging到磁盘上。
一搬不去改它。 - hipe_compile
开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。
开启之后,每次启动 rabbitmq-server,需要多花1分钟左右。
看下 rabbitmqctl status
信息,混个眼熟:
1
|
|
2.2 命令行
1
|
# 添加新的 vhost
|
在开始介绍概念之前,先可以从Web UI上来认识一下rabbitmq:
rabbitmq overview 首页监控面板:
3. Exchange类型
AMQP 0-9-1 定义了四种内置类型的exchange type: direct, fanout, topic, header。exchange除了类型以外,还可以指定一些属性:
- Name: 交换器名字。一般以
.
号分隔以作区分 - Durability: 持久化的exchange在broker重启之后依然存在。相对应是 transient exchange
- Auto-delete: 如果设置了该属性,在最后一个队列unbound之后,exchange会自动删除
-
Arguments: 可以用在满足插件扩展上
-
alternate-exchange
RabbitMQ自己扩展的功能,不是AMQP协议定义的。
Alternate Exchange属性的作用,创建Exchange指定该x-arguments
的alternate-exchange
属性,发送消息的时候根据route key没有找到可以投递的队列,这就会将此消息路由到 Alternate Exchange 属性指定的 Exchange (就是一个普通的exchange)上了。比如把MySQL的binlog订阅出来,因为里面有许多表,每个表的dml行数有多有少。我们可以将变更量多的表单独放到一个队列,其它表一起放到一个队列,就可以为原始的exchange添加 alternate-exchange 属性,将其它表的数据重新投递到另一个exchange。
-
3.1 fanout
fanout类型的exchange是最容易理解的,它会把来自生产者的消息广播到所有绑定的queues上。这种情况一般会把消息的routing_key设置为空''
,甚至不关心队列的名字。如下图:
amq.gen-RQ6...
和amq.gen-As8...
是消费者随机生成了两个队列,绑定到fanout exchange上,C1,C2会各自收到一模一样的消息。
3.2 direct
direct类型的exchange转发消息到队列里,是直接基于消息的routing key。
C1在声明队列的时候,指定routing_key=error。C2的队列上绑定了info,error,warning三个key。
于是error类型的消息会被同时发送到C1,C2(准确的说是两个队列上),而info,warning类型的消息只发送到队列amqp.gen-Agl...
。
如果要达到Round-Robin轮询效果,即两个Consumer依次从同一个队列里取消息,那么可以在声明队列的时候指定相同的 queue name,rabbitmq会自动均衡的发送消息给多个Consumer,可水平扩展消费者的处理能力(如果要保证处理顺序,得设置prefetch_count=1)。
3.3 topic
topic类型的exchange大大提升了消息路由的灵活性。不像fanout那样无脑的全部转发,也不像direct那样指定所有的routing_key,否则不匹配的key的消息就会被丢弃。
比如有一个收集日志的系统,模块包括auth/cron/kernel/app1/app2,日志级别包括error,info,warning。现在要把所有模块的error日志规整在一起,可以设计routing_key: <module>.<severity> (auth.error, auth.info, …, app1.error, app1.info…),然后设置queue的binding_key=’*.error’
topic exchange 会根据 .
划分word,有以下两种正则符号用于匹配routing_key:
*
: 代表一个word#
: 代表0个或多个word
拿官网的例图来说:<敏捷度>.<颜色>.<物种>
上图创建了3个bindings:
- 队列Q1的binding_key=
*.orange.*
,即对所有橙色的动物感兴趣 - 队列Q2绑定了
*.*.rabbit
和lazy.#
,即订阅了所有和兔子相关的消息,以及反应迟钝的动物
于是:
- routing_key为
quick.orange.rabbit
的消息,会被发送到两个队列 - routing_key为
lazy.orange.elephant
的消息,也会被发送到两个队列 - routing_key为
quick.orange.fox
的消息,只会发送到Q1 - routing_key为
lazy.brown.fox
的消息,只会发送到Q2 - routing_key为
lazy.pink.rabbit
的消息,只会发送到Q2。虽然匹配到了lazy.#
和*.*.rabbit
,但只会发送一次 - routing_key为
quick.brown.fox
的消息,会被丢弃,因为没有任何绑定的队列得到匹配 - routing_key为
lazy.orange.male.rabbit
的消息,还是会发送到Q2,因为lazy.#
然而orange
、quick.orange.male.rabbit
,也破坏了约定,但没得到匹配,消息丢弃。 - routing_key为
#
,接受所有消息,相当于fanout exchange - routing_key没有
*
和#
时,相当于direct exchange
3.4 headers
header类型的exchange用的不多,是在routing_key不能满足使用场景的情况下(如routing_key必须是字符串),在消息的头部加入一个或多个key/value,然后在声明队列的时候也指定要绑定的header。
binding的时候有个参数x-match
,指定headers所有的k/v都要匹配成功(all
)还是任意一个匹配则接受(any
)。
3.5 x-consistent-hash
这是个第三方插件形式存在的exchange,目前已内置于rabbitmq:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange
x-consistent-hash
类型的exchange可以根据routing_key,用一致性哈希算法,将消息路由到不同的队列上。它可以尽可能的保证每个队列上的消息数量相同,也可以随时添加更多的队列来“分流”,并且能保证同一个routing_key会进入相同的queue。
要达到这样的效果,queue routing key必须是一个字符串类型的数字。比如Q1:routing_key=’10’, Q2:routing_key=’20’,那么消息就会按照1:2的比例,发送到Q1,Q2。
3.6 x-modulus-hash
第三方插件形成存在的exchange,从3.6.0版本开始,也内置到了rabbitmq发行版:https://github.com/rabbitmq/rabbitmq-sharding
x-modulus-hash
类型的exchange与 x-consistent-hash
很像,也叫 sharding exchange,即将message在多个队列之间进行分区发送。它的实现方法是根据 routing_key 先获得hash,再用 Hash mod N
得到队列,N就是绑定到exchange上的队列个数。
4. Queue属性
Queue 要先于 Exchange 创建,否则生产者发布的消息,在没有绑定队列之前,会丢失。
已存在的Queue可以重复declare,但前提是属性要相同。
- Name: 队列名称。可以在应用里面指定,或者交给broker生成
- Durable:持久化的Queue在broker重启之后,依然存在。
注意,这里的持久化与消息持久无关。是个 property - Exclusive: 为True时,表示当Consumer的Connection端口之后,队列自动删除。一般由broker生成的随机队列名,指定这个选项 。
排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的 -
Auto-delete: 当最后一个consumer取消订阅之后,队列自动删除
-
Arguments: 设置可选的一些参数,如
-
x-message-ttl
消息在队里里最大存活时间,超过这个ttl就会被丢弃。单位毫秒 -
x-max_length
队列里最多容纳的消息个数,超过这个值,则会从队列头部drop掉消息 -
x-max-priority
设置了这个参数,就表示这是一个具有优先级的队列。它的值是可定义的优先级最大值,一般10以内就够了。
在生产商Publish消息的时候,消息Property上可设置Priority -
x-queue-mode
这个参数是控制是否为”延迟队列”,Lazy Queue是在3.6.0引入的,它会尽量把消息存在磁盘上,节省内存
RabbitMQ一开始的设计初衷,是做异步、解耦,所以会把消息放在内存里面,以便快速的发送给消费者(持久化类型的消息会同时存在于磁盘和内存缓存中)。如果用它来暂时存放大量消息,而不消费或者消费太慢,会导致性能明显下降,因为为了释放内存,消息得swap到磁盘上 —— 会阻塞队列接收新消息。如果内存使用达到broker设置的 water-mark,也会拒绝接收新消息。
Lazy Queue(x-queue-mode=lazy
)的作用就是一接收到新消息,马上存到文件系统,完全避免了前面提到的内存占用。这会增加磁盘I/O(顺序的),与处理持久化类型的消息很相似。 -
x-dead-letter-exchange
死信。当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:- 消息被拒绝(basic.reject or basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列。
死信被重新 requeue 时,可以改变它的routing_key,以便新的队列处理,routing_key用x-dead-letter-routing-key
指定,如果不指定则继续使用消息原来的routing_key。
-
5. Message属性
- routing_key
路由关键字,exchange根据这个关键字进行消息投递 - delivery_mode
- 1: Non-persistent,消息不持久化到磁盘,尽快被消费掉。重启broker之后消息丢失
- 2: Persistent,消息持久化。当然被取走的消息,也就不存在了
- headers
消息头信息,key/value形式,可以认为给消息打上了各种各样的标签。可用于代替 routing_key 去路由(结合headers来下的exchange),或者第三方插件使用。 - properties
实际上 headers 和 delivery_mode 也是properties的一部分,因为使用较多,所以单独拿出去。这里也只提几个:- priority
消息优先级。数字,优先级高的消息会排在队列头部 - correlation_id 和 reply_to
这两个一般用于实现服务间RPC调用, 即生产者发起请求到rabbitmq队列,等待处理结果返回,消费者处理完消息后返回结果给调用方。
reply_to 在消息里面告诉消费者,处理完的结果放到哪个队列,调用方根据 correlation_id 找到结果。详情参考 https://www.rabbitmq.com/tutorials/tutorial-six-python.html - expiration
消息自身的Time-To-Live,用的较少,也叫 Per-Message TTL In Publisher.
前面提到,队列的arguemnts可以设置 x-message-ttl ,也叫 Per-Queue Message TTL In Queues.消息是否过期以两者的最小值为准,并且消息自身过期时间到了之后,不会自动从队列删除,而是在发送给消费者的时候丢弃。
队列自身也有个x-expires
,它指的是队列在多久没有消费者连上来,超过这个时间后队列自动删除。
- priority
- payload: 消息正文
6. 插件
RabbitMQ支持插件式的来扩展功能。
1
|
列举server上安装的所有插件
|
下面是几个常用插件:
-
rabbitmq_management
管理 rabbitmq server 的插件,提供给予HTTP的API和 WebUI,提供管理exchanges、管理queues、管理users、管理policies,监控,发布/接收消息。功能强大,基本是必定开启的插件。
开启管理插件后,也可以选择不使用Web界面,从http://localhost:15672/cli/rabbitmqadmin
下载rabbitmqadmin
命令行工具,它用在一些脚本里面会很方便。(提示: rabbitmqctl 是不能创建exchange和queue,但rabbitmqadmin可以) -
rabbitmq_federation
与MySQL Federated 存储引擎很相似,可以认为 federated exchange 是其它exchange(也叫upstream exchange)的“软连接”、“流量复制”。消息是被publish到上游exchange,然后消费者是从其它broker上的federated exchange订阅消息。
Federated exchanges/queues 是通过 AMQP 协议的Erlang客户端从真实broker里面取数据(不会消费源数据),可以实现跨网络的消息提取,或者将不同地方的消息汇总到一处。应用场景有 broker / cluster 数据迁移,模仿真实数据的线下测试。 -
rabbitmq_shovel
shovel插件就是一个 消费者 + 生产者:从一个queue消费内容,发送到另一个exchange上,甚至可以对消息做些转换。你可以自己实现将消息从源broker消费,重新publish到另一个exchange,但shovel帮我们做好了。 -
rabbitmq_mqtt
实现了 MQTT 3.1 协议的adapter,如文章开头所述。 -
rabbitmq_consistent_hash_exchange
一致性hash exchange,如前文所述。
6. 策略 Policy
首先为什么rabbitmq会有策略这个东西。
前面我们讲到,queue和exchange有一些固定属性,如durable
、exclusive
、auto-delete
等,还有一些可选参数,也叫x-arguments
,如x-max-length
、x-queue-mode
。这些都是客户端在定义队列和交换器时指定的。
如果事后想修改 TTL 或者 queue length limit ,那么得修改应用、重新部署,甚至涉及到删除队列,重新declare。Policy就是解决这个痛点的,在服务端对匹配的 exchanges 或者 queues 设置参数,无需动应用。更多请参考 https://www.rabbitmq.com/parameters.html
一个 policy 包含以下内容:
- name: 策略名字
- pattern: 对哪些queues(exchanges)的应用策略,正则表达式
- definition: 策略内容定义,key/value形式(也可以认为是JSON格式)
- apply-to: 策略应用在什么身上,
queues
、exchanges
、all
。默认是all - priority: 策略优先级,默认0
每个exchange/queue只能“注入”一个policy,所以如果要设置多个策略,把key/value组合成json,定义在一起。设置完成会马上生效,包括后面新创建的exchange、queues。
1
|
将exchange设置为 alternate exchange:(策略名:AE)
|
7. 消息可靠性
有的系统要保证消息不允许丢失,甚至不允许重复,有的系统追求的是高性能,所以要在性能和可靠性之间权衡。rabbitmq在多个层面提供消息可靠性保证。
7.1 持久化
声明持久化的exchange: channel.exchange_delcare(exchange_name, durable)
声明持久化的队列:channel.queueDeclare(queue_name, durable, exclusive, auto_delete, arguments)
发布的持久化消息,投递模式为2: delivery_mode=2
http://www.rabbitmq.com/reliability.html
persistent
7.1 ack & confirm
持久化保证了在broker或者机器出现异常的时候,消息不会丢失,要保证发送者在pub消息、接收sub消息时出现网络异常,客户端也应该有相应的处理。
Consumer Delivery Acknowledgements
rabbitmq对Consumer处理消息提供 acknowledgements 确认机制,客户端通过basic.consume
注册到broker(push),或者通过basic.get
pull 消息,都可以在指定是否开启ack
。
delivery tags是实现 ack 的关键,RabbitMQ会用 basic.deliver
方法向消费者推送消息,这个方法携带了一个 delivery tag,它是单调递增的正整数,在一个channel中唯一代表了一次投递。
确认模式包括自动确认和手动确认。
自动确认就是rabbitmq一旦把消息发送出去后,就认为成功,完成确认。此模式性能最高,只要消费者能处理的过来,但自然降低了消息到达处理的可靠性,比如一个消息还在路上,消费者的TCP连接或者channel就关闭了,那么消息也就丢失。如果消费者处理不过来,可能会导致消息在客户端挤压,内存过载,引发异常。所以自动确认一般用在消息比较平稳、客户端能处理的来的系统。
手动确认,就是客户端需要自己发送确认命令,包括:
- basic.ack —— 确认成功,客户端成功处理
- basic.nack —— 确认失败,客户端处理失败,但依然删掉消息
- basic.reject —— 确认失败,客户端处理失败,消息不删除,可重新发送。
手动确认模式,可以控制消息处理的速度(流控QoS),通过 prefetch 设置该channel上最大没有确认的消息数,server会等待有空闲的配额时才继续发送给消费者。
手动确认模式如果不设置 prefetch_count,那么消费者可能会接收许多的消息但未ack,从而导致内存耗尽,所以这点需要小心。正常来说,100-300是个比较可控的范围。(当然如果是 pull 模式,就不存在QoS一说)
basic.ack
和basic.nack
可以设置 multiple
字段,批量确认来减少网络传输。比如说在信道 ch
上有 delivery tags 5, 6, 7, 8 没有确认,当客户端发回的确认帧是8并且 multiple=true,那么5-8的tags都被ack。
在启用手动确认时,发生网络连接断开或者消费者崩溃,而无法返回 ack/nack 命令时,(检测方法是 heartbead)rabbitmq会自动将没有确认的消息 requeue,所以客户端处理消息时,最好能满足幂等性,即能够重复处理这些消息。
Publisher Confirms
rabbitmq对Producer发布消息提供 confirm 机制:客户端可以发送一个 confirm.select
命令将channel设置成confirm
工作模式。
所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(basic.ack),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。
如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息,确保消息不会再发送之前就丢失。
然后对于需要持久化的消息的确认,不能完全保证数据被刷到磁盘上,因为每个消息调用 fsync 的带来的IO代价太高,rabbitmq会每隔几百毫秒,批量将消息从文件系统缓存 fsync 刷到磁盘。(了解MySQL的话对这个应该不陌生)
7.2 事务
RabbitMQ 实现了AMQP 0-9-1协议里的事务,这样说唯一能确保消息不丢失的方式,信道可以设置成 transaction 模式:发布消息,commit/rollback消息。
但是事务在这里太重了,而且会极大的降低性能。不用。
7.3 rabbitmq分布式
待聊
5. python使用示例
https://pika.readthedocs.io/en/0.10.0/intro.html
下面的示例是使用Maxwell或者MySQL binlog增量流,json数据进入rabbitmq,然后通过 pika —— python版本的rabbitmq client,重新组装成sql,达到数据增量同步的效果。
1
|
def binlog_sync(self):
|
参考
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- http://www.rabbitmq.com/admin-guide.html
- https://geewu.gitbooks.io/rabbitmq-quick/content/index.html
- http://blog.csdn.net/anzhsoft/article/details/19607841
- http://dbaplus.cn/news-141-1464-1.html