几个基本概念
Producer
生产者,发送消息的一方,图中左侧的client。
Consumer
消费者,接收消息的一方,图中后侧的client。
Broker
消息中间件的服务节点,一般一个RabbitMQ Broker看成一台RabbitMQ服务器。
消息
消息包含两部分:消息体和标签。消息体(payload)是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。消息的标签用来表述这条消息 , 比如一个交换器的名称和一个路由键。 生产者把消息交由 RabbitMQ, RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者。
连接
连接其实是一条TCP连接,如果是生产者还是消费者都需要和Broker建立连接。
信道
信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
为什么还要引入信道呢?试想这 样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那 么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和 销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 类似 NIO' (Non-blocking 1/0) 的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
队列
队列是 RabbitMQ 的内部对象,用于存储消息,当多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询) 给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。
交换机
生产者将消息发送到 Exchange (交换器,通常也 可以用大写的 "X" 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或 许会返回给生产者,或许直接丢弃。
路由键
生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey,用 来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。
绑定
RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey), 这样 RabbitMQ 就知道如何正确地将消息路由到队列了。
生产者将消息发送给交换器时, 需要一个 RoutingKey, 当 BindingKey 和 RoutingKey 相匹配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的 BindingKey。 BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型, 比 如 fanout 类型的交换器就会无视 BindingKey, 而是将消息路由到所有绑定到该交换器的队列中 。
虚拟主机
每个Vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列,交换机和绑定,它拥有自己的权限机制。
交换器类型
RabbitMQ 常用的交换器类型有 fanout、 direct、 topic、 headers 这四种。
fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,相当于广播模式。
direct
把消息路由到那些 BindingKey 和 RoutingKey完全匹配的队列中。
topic
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中。
headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
生产者和消费者的流转过程
生产者流转过程
消费者流转过程
示例代码
package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = " routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection();// 创建连接 Channel channel = connection.createChannel(); // 创建信道 //创建一个 type="direct"、持久化的、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 创建一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换器与队列通过路由键绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送一条持久化的消息: Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 关闭资源 channel.close(); connection.close(); } } package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[] { new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 这里的连接方式与生产者的 demo 略有不同, 注意辨别区别 Connection connection = factory.newConnection(addresses); // 创建连接 final Channel channel = connection.createChannel(); // 创建信道 channel.basicQos(64); // 设置客户端最多接收未被 ack 的消息的个数 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); // 等待回调函数执行完毕之后, 关闭资源 TimeUnit.SECONDS.sleep(15); channel.close(); connection.close(); } }
几个关键API的定义和参数读解
exchangeDeclare
exchange: 交换器的名称。
type: 交换器的类型,常见的如 fanout、 direct、 topic
durable: 设置是否持久化。 durable 设置为 true 表示持久化, 反之是非持久化。持 久化可以将交换器存盘,在服务器重启 的时候不会丢失相关信息。
autoDelete: 设置是否自动删除。 autoDelete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个交换器绑
定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: "当与此交换器 连接的客户端都断开时, RabbitMQ 会自动删除本交换器"。
internal : 设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程 序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument: 其他一些结构化参数,比如 alternate-exchange (备份交换机)
exchangeDeclare
Queue. DeclareOk queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Str工ng , Object> arguments) throws IOException;
queue : 队列的名称。
durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候可以保证不丢失相关信息。
exclusive: 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection) 可见的,同一个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
argurnents: 设置队列的其他一些参数,如 x-rnessage-ttl 、 x-expires 、x-rnax-length、x-rnax-length-bytes 、x-dead-letter-exchange、x-dead-letter-routing-key, x-rnax-priority 等。
queueBind
将队列和交换器绑定起来
Queue . BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
queue: 队列名称:
exchange: 交换器的名称:
routingKey: 用来绑定队列和交换器的路由键;
argument: 定义绑定的一些参数。
exchangeBind
将交换器与交换器绑定,绑定之后, 消息从 source 交 换器转发到 destination 交换器,某种程度上来说 destination 交换器可以看作一个队列。
Exchange . BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
channel . exchangeDeclare( "source" , "direct", false, true, null) ; channel . exchangeDeclare("destination", "fanout" , false, true, null); channel.exchangeBind("destination" , "source" , "exKey"); channel . queueDeclare( "queue", false, false, true, null); channel . queueBind("queue" , " dest工nation " , "") ; channel.basicPublish( "source" , "exKey" , nul l , "exToExDemo". getBytes ()) ;
basicPublish
void basicPublish(String exchange, String routingKey, boo1ean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
exchange: 交换器的名称,指明消息需要发送到哪个交换器中 。 如果设置为空字符串, 则消息会被发送到 RabbitMQ 默认的交换器中。
routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中 。
props : 消息的基本属性集,其包含 14 个属性成员,分别有 contentType 、contentEncoding、 headers (Map<String , Object>) 、 deliveryMode、 priority、correlationld、 replyTo、 expiration、 messageld、 timestamp、 type、 userld、appld、 clusterld。
byte [] body: 消息体 (payload),真正需要发送的消息。
basicConsume
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String ,Object> arguments, Consumer callback) throws IOException;
queue: 队列的名称:
autoAck: 设置是否自动确认。建议设成 false,即不自动确认:
consumerTag: 消费者标签,用来区分多个消费者:
noLocal : 设置为 true 则表示不能将同一个 Connection中生产者发送的消息传送给 这个 Connection 中的消费者:
exclusive : 设置是否排他:
arguments : 设置消费者的其他参数:
callback: 设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer, 使用时需要客户端重写 (override) 其中的方法。
对于消费者客户端来说重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法, 具体如下:
void handleConsumeOk(String consumerTag) ;
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) ;
void handleRecoverOk(String consumerTag);
basicQos
允许限制信道上的消费者所能保持的最大 未确认消息的数量。
void basicQos(int prefetchSize, int prefetchCount, boo1ean global)
prefetchSize 这个参数表示消费者所能接收未确认消息的总体大小的上限,单位为 B
prefetchSize 以确保发送的消息都没有超过所限定的 prefetchCount 的值
例子:
channel .basicQos(3, false); // Per consumer limit
channel.basicQos(5, true); // Per channel limit
channel .basicConsume("queuel", false, consumerl) ;
channel.basicConsume("queue2", false, consumer2) ;
那么这里每个消费者最多只能收到 3 个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为 5。在未确认消息的情况下,如果 consumerl 接收到了消息 1 、 2 和 3,那么 consumer2至多只能收到 11 和 12。如果像这样同时使用两种 global 的模式,则会增加 RabbitMQ 的负载,因为 RabbitMQ 需要更多的资源来协调完成这些限制。如无特殊需要,最好只使用 global 为 false 的设置,这也是默认的设置。
消费端的确认与拒绝
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息 (任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 RabbitMQ 会一直 等待持有消息直到消费者显式调用 Basic.Ack 命令为止。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可 能还是原来的那个消费者。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的 消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
消息拒绝:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中 deliveryTag 可以看作消息的编号 ,它是一个 64 位的长整型值,最大值是 9223372036854775807。如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入 队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject 命令一次只能拒绝一条消息 ,如果想要批量拒绝消息 ,则可以使用 Basic.Nack 这个命令。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中 deliveryTag 和 requeue 的含义可以参考 basicReject 方法。 multiple 参数设置为 false 则表示拒绝编号为 deliveryT坷的这一条消息,这时候 basicNack 和 basicReject 方法一样; multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所 有未被当前消费者确认的消息。
消息恢复:
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。 如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息 来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条消 息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于
channel.basicRecover(true) ,即 requeue 默认为 true