RabbitMQ作为应用程序之间通信的工具,越来越受欢迎,下面结合介绍一下RabbitMQ中一些简单的概念。建议初学者可以看一下RabbitMQ官方教程和官方在GitHub上提供的样例代码。
几个重要概念
生产者
生产者也叫客户端,不是RabbitMQ的一部分,他创建消息,并将消息发送到给消息代理RabbitMQ。消息主要分为两部分:有效载荷和标签。有效载荷为要传输的数据,标签用于标明一个交换器(下面会讲)的名称和消息的主题。
消费者
消费者也叫服务端,同样不是RabbitMQ的一部分,消费者订阅到队列上,消费消息。
信道
首先必须连接到RabbitMQ才能发布或消费消息。那么就需要应用程序和Rabbit之间建立一条TCP连接。一旦TCP连接打开,就建立了一条信道。但是频繁的建立和断开TCP连接会对系统造成很重的负担而且操作系统每秒建立的TCP连接是有限的。所以RabbitMQ中的信道是建立在TCP上的虚连接。RabbitMQ为每个信道指派一个ID,发布,订阅和接收消息都是在指定的信道上完成的。
队列
RabbitMQ 主要包含三个部分:交换器,队列和绑定。生产者吧消息发布到交换器上(注意是交换器,不是队列),消息最终到达队列(期间有类似路由的过程),并被消费者接收。
交换器
交换器类似网络中的路由器,队列通过路由键绑定到交换器,RabbitMQ 根据之前发送消息时定义的路由键——route key,确定将消息投递到哪个队列。如果路由的消息不匹配所有绑定的路由键,消息将会被丢弃。交换器主要有headers,direct,fanout,topic四种。
headers
headers交换器时使用的比较少的一种交换器,他是一种忽略route key使用headers的键值对匹配。
direct
direct交换器非常简单,如果路由键匹配的话,消息就被投递到对应的队列。使用方法有两种,一种是单一绑定,另一种是多绑定。
单一绑定
意思是,不同的队列绑定到同一个交换器上。这样当不同route key 的消息到达交换器时,交换器根据不同的route key 将消息放到不同的队列中。
多绑定
将两个不同的队列用相同的route key 绑定在同一个交换器上,这是交换器会向具有相同的route key的队列上广播匹配route key 的消息。这种设置的作用在fanout交换器中介绍。
fanout
和 direct的多绑定相同,它会将受到的消息广播给所有绑定的队列上。这用交换器的作用是允许对单条消息作出不同的反应。例如,当你在淘宝下一个订单时,首先淘宝要告诉银行向商家转一笔钱,同时还要告诉商家要准备商品,并发货。在这种情况下,一个下订单的消息发送到RabbitMQ中后,需要作出两个反应:转账和准备商品。而fanout模式的很好的解决了这个问题。rabbit将消息分别发送到银行和商家绑定的两个队列就可完成这个功能。
topic
topic,顾名思义就是根据队列在绑定到交换器时,定义了自己感兴趣的话题。他的route key 支持正则匹配方式。例如"stock.used.nyse","nyse.vmw"。你可以根据喜好在route key 中包含多个用“点”隔开的单词(最多255个)。
- *(star) 表示只替代一个单词,例如"chris.*",可以匹配"chris.name" 但不可以匹配"chris" 或者"chris.age.eight"
- #(hash) 表示可以匹配0个或多个单词,例如"chris.#",可以匹配"chris",也可以匹配"chris.age.eight",还可以匹配"chris.name"
绑定
绑定的意思就是将队列绑定到交换器上,当消息到达交换器后,会根据不同交换器的类型,以及在绑定时定义的route key,将消息分发到不同的队列中取。在java API中,代码如下:
// only care about the message with bindingKey
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
代码样例
下面用RabbitMQ官方提供的代码(文章开头的GitHub)来解释一下Java API的使用。
- 消息发送者(生产者)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 在连接上创建信道
Channel channel = connection.createChannel();
// 声明交换器参数意义:交换器名称,交换器类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
// 声明队列 参数的意义依次是:
// 队列名称: 在往交换器上绑定时使用
// durable: 如果为true的话,队列中的消息在关闭rabbit服务时会持久化,
// exclusive: 设置为true的话,队列为私有的只有当前程序才能消费队列消息
// autoDelete 当最后一个消费者取消订阅是,队列自动删除
// arguments: 队列的其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 发布消息 参数意义如下:
// exchange: 消息发布到那个交换器上
// route key: 路由键,exchange 根据其路由消息
// props: 消息的其他属性,headers 会用到
// body: 消息内容
channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
- 消息接收者(消费者)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 消费者端的代码不需要声明交换器
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
// 队列需要有消费者,消息发送者不需要
// 第二个参数为auto_ack: 设置为true时,一旦消费者接收了消息,RabbitMQ会自动视为其确认,如果确认了消息将从队列中移除。
channel.basicConsume(QUEUE_NAME, true, consumer);