一 mq简介(message queue)
消息队列技术是分布式应用间交互信息的一种技术。
消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走
通过消息队列,应用程序可独立地执行-它们不需要知道彼此的位置,或在继续执行前不需要等待程序接收此消息
它接受收据,存储消息数据,转发消息
使用场景: 解耦,提高峰值处理能力,送达和排序保证,缓冲等
二 mq术语
1 producer
消息发送者,在mq称为生产者
2 consumer
消息接收者,在mq称为消费者
3 Queue
消息存储在队列(queue)中, 一个队列不受任何限制,它可以存储你想要存储的消息量,它本质上是一个无限的缓冲区。
多个生产者可以向同一个队列发送消息,多个消费者可以尝试从同一个队列中接收数据
在rabbitmq中,队列消息可以设置为持久化, 临时或自动删除
(1) 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
(2) 设置为临时队列,queue中的数据在系统重启之后就会丢失
(3) 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除
4 Exchange
Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是直接通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和queue一样,exchange也可设置为持久化,临时或者删除。
Exchange又下面4种类型,direct(默认),fanout, topic, headers,分别对应不用类型的转发策略
(1) direct
直接交互器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的queue
(2) fanout
广播式交互器,不管消息的ROUTING_KEY设置为什么,Exchange都会讲消息转发给所有绑定的Queue
(3) topic
主题交互器,工作方式类似于组播。Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
(4) headers
消息体的header匹配(ignore)
5 Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
6 virtual host
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。
三 通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
- P1生产消息,发送给服务器端的Exchange
- Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
- Queue1收到消息,将消息发送给订阅者C1
- C1收到消息,发送ACK给队列确认收到消息
- Queue1收到ACK,删除队列中缓存的此条消息
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
- 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
- 如果consumer接收了消息,但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
- 如果consumer接收了消息,但是程序中有bug而忘记了ack,rabbitmq会重复发送消息。
- rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以设置参数中的reject为true达到目的,那么rabbitmq将会把消息发送给下一个注册的consumer。
Exchange和Queue是最核心的组件,Exchange和Queue是在rabbitmq server端(又叫broker端),producer和consumer在应用端
生产者:
首先创建一个connection通过socket连接去和服务器连接起来(需要传目的服务器的ip,用户名,密码)
接着创建一个channel,这是大部分要做的事情所在
要发送消息,必须声明一个队列,然后我们可以向队列发布消息
消息的内容是字节数组,在使用时,注意编码问题。
消费者:
当队列里有消息时,消费者要随时能够从队列里获取消息,所以我需要一直运行它,让它监听消息。
生产者要发送消息,一定要事先知道消费消息的程序的对列是哪个。所以,在运行生产者程序前,需要先启动消费者程序。因此声明队列,就应该在消费者程序中完成。
这个好好看下: 写的好到爆