https://blog.csdn.net/hry2015/article/details/79016854
1. 概述
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。本文的包含的主要内容如下:
- RabbitMQ的安装和管理界面
- 第一个Hello World程序
- 详细介绍RabbitMQ中消息代理(message brokers)、连接(Connections)、通道(Channels)、队列(queue)、发送消息和消息属性、发送/接收消息及使用注意事项
- 1
- 2
- 3
2. RabbitMQ 概述
RabbitMQ是一个消息代理(message brokers),它负责从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)
如果要了解RabbitMQ,最好学习一下AMQP协议,会对理解RabbitMQ工作原理有很大帮助,详细见AMQP 0-9-1 协议简介
3. 安装RabbitMQ和管理界面
- 安装RabbitMQ服务并启动,参考这篇文章: Ubuntu 16.04 安装 RabbitMQ
- 安装管理界面插件
#启用rabbitmq-management插件:
sudo rabbitmq-plugins enable rabbitmq_management
使用guest/guest帐号登录管理界面 http://127.0.0.1:15672 - 创建新的帐号 hry/hry,并设置类型为Administrator
4. 第一个HelloWold程序
第一个RabbitMQ的HelloWorld实现如下功能,生产者发送一个消息,消息者接收到此消息:
5. 发布者(publishers)亦称生产者(producers)
负责发布消息,本节介绍发布者的代码和相关的方法
业务逻辑如下:
1 配置连接工厂
2 建立TCP连接
3 在TCP连接的基础上创建通道
4 声明一个队列
5 发送消息
5.1. 连接(Connections)和通道(Channels)
生产者和消费者是使用TCP和RabbitMQ建立长连接。但是TCP是非常耗费资源,为了减少TCP连接的数据,RabbitMQ提出通道的概念。在同一个TCP连接上可以建立多个通道,即可以把通道理解成共享一个TCP连接的多个轻量化连接。通道不是线程安全的,不推荐多个线程共用一个通道。在多线程/进程应用中,为每个线程/进程开启一个通道(channel),并且这些通道不能被线程/进程共享。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
以下功能的代码如下:
1 配置连接工厂
2 建立TCP连接
3 在TCP连接的基础上创建通道
// 配置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
Connection connection = null;
Channel channel = null;
// 建立TCP连接
connection = factory.newConnection();
// 在TCP连接的基础上创建通道
channel = connection.createChannel();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
5.2. 队列(queue)
用于存储即将要发送的消息(message)。它的大小理论上是无限的,唯一限制大小是RabbitMQ的内存和磁盘大小.队列在声明(declare)后才能被使用。声明队列操作是幂等操作,即如果一个队列尚不存在,声明一个队列会创建它,如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响,如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
创建队列方法:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException
- 1
- 2
队列配置参数说明:
第一个参数queue
定义队列的名称,最多255字节的一个utf-8字符串。如果此参数为空字符串,则RabbitMQ会自动生成一个唯一的队列名,在同一个通道的后续的方法中,我们可以使用空字符串来表示之前生成的队列名称,之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称
以”amq.”开始的队列名称被预留做消息代理内部使用,不能被应用使用,否则抛出403 (ACCESS_REFUSED)错误
第二个参数 durable
是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
第三个参数 execulusive
表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参数的优先级高于durable
第四个参数 autoDelete
当没有生成者/消费者使用此队列时,此队列会被自动删除。
(即当最后一个消费者退订后即被删除)
第五个参数 arguments
其它的扩展属性,如一些消息代理用他来完成类似与TTL功能时相关的参数
声明一个队列代码如下:
channel.queueDeclare(QUEUE_NAME+4, false, false, false, null);
- 1
备注:还有一些特殊队列
- 临时队列:等价于建立一个durable=false,execulusive=true,autoDelete=true,名称为随机的队列
- 内部队列:以”amq.”开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出
5.3. 发送消息和消息属性
发送消息方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
- 1
详细参数说明:
第一个参数exchange:
执行队列使用的交换机(exchange),交换机拿到一个消息之后将它路由给一个或零个队列。关于这块的内容后面会详细说明。如果值为空字符串,则表示使用默认交换机。默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
第二个参数routingKey:
指定路由键,如果使用默认交换机,则此值和队列名称相同
第三个参数props:
除了Routing key外,消息还可以配置如下属性,常用属性如下:
○ Content type:内容类型
○ Content encoding:内容编码
○ headers; 消息的头信息,类似http协议中的header属性
○ Delivery mode (persistent or not) 投递模式(持久化 或 非持久化): 如果此值是persistent ,则此消息存储在磁盘上。如果服务器重启,系统会保证收到的持久化消息未丢失,将消息以持久化方式发布时,会对性能造成一定的影响
○ Message priority:消息优先级
○ String correlationId;
○ ReplyTo; 反馈队列
○ Expiration period: 消息有效期
○ String messageId;
○ Message publishing timestamp:消息发送的时间戳
○ String type;
○ String userId;
○ Publisher application id:发布应用的ID
○ String clusterId;
第四个参数body:
消息的有效负载pPayload(消息实际携带的数据),它被RabbitMQ当作不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。
发送消息代码如下
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
- 1
5.4. 交换机(Exchange)
虽然代码没有出现交换机的配置,但是实际我们使用了交换机。
交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。RabbitMQ的提供了四种交换机
- Direct exchange(直连交换机)
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
我的例子里只使用了默认交换机:默认交换机(default exchange)实际上是一个由消息代理预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
其他的交换机,我们后续文章再介绍。
6. 消费者(consumers)
接收并处理消息,业务如下
- 配置连接工厂
- 建立TCP连接
- 在TCP连接的基础上创建通道
- 声明一个队列
- 接收消息并处理
以上代码和发送者部分类似,这是略,只列出不同的部分
6.1. 接收消息并消费
Consumer
消息的接收是异常的,通过com.rabbitmq.client.Consumer定义回调方法。回调接口用于处理订阅的消息。DefaultConsumer是此接口的默认,官方也推荐如果要实现自己的Consumer,则继承此类
// 默认消费者实现
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [HelloworldRecv] Received '" + message + "'");
}
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
接收消息方法
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- 1
详细参数说明:
第一个参数queue
要接收消息的队列名称
第二个参数autoAck
消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉,而且网络原因也有可能引起各种问题。为了解决这个问题,RabbitMQ给我们两种建议:
○ 自动确认模式(automatic acknowledgement model):当RabbitMQ将消息发送给消费者后,立即从内存中删除消息。
○ 显式确认模式(explicit acknowledgement model):只有消费者发送确认回执(acknowledgement),RabbitMQ才删除消息。如果一个消费者在尚未发送确认回执的情况下挂掉了,那RabbitMQ会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,RabbitMQ会死等下一个注册到此队列的消费者,然后再次尝试投递。
第三个参数callback
指定处理消息的回调类
接收消息代码如下
// 接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
- 1
- 2
7. 完整代码和测试
生产者的完整代码
HelloworldSend
消费者完整代码
HelloworldRecv
测试代码完整代码
测试如下场景:一个消费者启动并接收消息,一个生产者启动并发送消息
public class BasicTest {
// 测试线程池
private ExecutorService executorService = Executors.newFixedThreadPool(10);
// rabbitmq的IP地址
private final String rabbitmq_host = "10.240.80.147";
// rabbitmq的用户名称
private final String rabbitmq_user = "hry";
// rabbitmq的用户密码
private final String rabbitmq_pwd = "hry";
@Test
public void helloworld() throws InterruptedException {
// 接收端
executorService.submit(() -> {
HelloworldRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd);
});
Thread.sleep(5* 100);
// 发送端
executorService.submit(() -> {
HelloworldSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd);
});
// sleep 10s
Thread.sleep(10 * 1000);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
测试
执行BasicTest 的helloworld()方法,结果如下
[HelloworldRecv] Waiting for messages.
[HelloworldSend] Sent 'Hello World!1515574041536'
[HelloworldRecv] Received 'Hello World!1515574041536'
- 1
- 2
- 3