一、交换机
1.1 作用
Exchange(交换机) 的作用就是接收消息并根据路由键转发消息到绑定的队列。
1.2 交换机常用属性
属性 | 含义 |
---|---|
Name | 交换机名称 |
Type | 交换机类型,direct、topic、fanout、headers等,它们本质都一样,只是消息转发的逻辑不同 |
Durability | 是否持久化,true 为持久化 |
Auto Delete | 当最后一个绑定到 Exchange 上的队列删除后,自动删除该 Exchange |
Internal | 当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 false |
Arguments | 扩展参数,用于扩展 AMQP 协议自制定化使用 |
二、不同类型的交换机
2.1 Direct Exchange
**Direct Exchange (直连型交换机) ** 是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:
1) 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key);
2) 当一个携带着路由值为 R 的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为 R 的队列。
Direct 模式可以使用 RabbitMQ 自带的 Exchange 即 default Exchange
,所以不需要将 Exchange 进行任何绑定(binding) 操作,消息传递时,RouteKey 必须完全匹配才会被队列接受,否则消息会被丢弃。
我们之前的实例使用的就是 default Exchange。
实例:
消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "direct";
// 4.声明交换机
String exchangeName = "directExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName = "directQueue";
channel.queueDeclare(queueName, false, false, false, null);
// 6.将交换机和队列进行绑定关系
// 注意:我们只绑定一个路由KEY,说明另一个路由不会被消费掉
String routingKey = "directA";
channel.queueBind(queueName, exchangeName, routingKey);
// 7.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
// 8.循环消费
System.err.println("消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端消费: " + msg);
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.循环发送消息
// 声明交换机
String exchangeName = "directExchange";
// 发送消息
String routingKey1 = "directA";
String msg1 = "directA 消息";
channel.basicPublish(exchangeName, routingKey1, null, msg1.getBytes());
String routingKey2 = "directB";
String msg2 = "directB 消息";
channel.basicPublish(exchangeName, routingKey2, null, msg2.getBytes());
// 5.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
先启动消费者,然后启动生产者,查看控制台输出:
消费端启动
消费端消费: directA 消息
可以看到未被绑定的 routingKey
消息未被消费掉。
2.2 Fanout Exchange
Fanout Exchange 是扇型交换机,它不处理路由键,只需要将队列绑定到交换机上。任何发送到 Fanout Exchange
的消息都会被转发到与该 Exchange
绑定(Binding)的所有 Queue
上。它的转发消息是最快的。
实例:
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "fanout";
// 4.声明交换机
String exchangeName = "fanoutExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName1 = "fanoutQueue1";
channel.queueDeclare(queueName1, false, false, false, null);
String queueName2 = "fanoutQueue2";
channel.queueDeclare(queueName2, false, false, false, null);
// 6.将交换机和队列进行绑定关系,不需要设置 routingKey
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
// 7.创建消费者并消费
new Thread(() -> {
try {
handleConsumer(channel,queueName1);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
handleConsumer(channel,queueName2);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private static void handleConsumer(Channel channel, String queueName) throws Exception {
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
// 7.循环消费
System.err.println(queueName + " 消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println(queueName + "消费端消费: " + msg);
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.声明交换机
String exchangeName = "fanoutExchange";
// 5.循环发送消息
for (int i = 0; i < 5; i++) {
String msg = "消息"+i;
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
// 6.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
先启动消费者,然后启动生产者,查看控制台输出:
fanoutQueue1 消费端启动
fanoutQueue2 消费端启动
fanoutQueue1消费端消费: 消息0
fanoutQueue1消费端消费: 消息1
fanoutQueue2消费端消费: 消息0
fanoutQueue1消费端消费: 消息2
fanoutQueue2消费端消费: 消息1
fanoutQueue2消费端消费: 消息2
fanoutQueue1消费端消费: 消息3
fanoutQueue1消费端消费: 消息4
fanoutQueue2消费端消费: 消息3
fanoutQueue2消费端消费: 消息4
2.3 Topic Exchange
Topic Exchange(主题交换机) 会将所有发送到 Topic Exchange 的消息转发到所有关心 RouteKey 中指定的Topic 的 Queue 上。
Exchange 将 RouteKey 和某 Topic 进行模糊匹配。使用这种类型,队列需要绑定一个 Topic。
注:可以使用通配符进行模糊匹配
#
匹配一个或多个词*
匹配一个词
实例:
消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "topic";
// 4.声明交换机
String exchangeName = "topicExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName1 = "topicQueue1";
channel.queueDeclare(queueName1, false, false, false, null);
String queueName2 = "topicQueue2";
channel.queueDeclare(queueName2, false, false, false, null);
// 6.将交换机和队列进行绑定关系
// 匹配一个或多个词
String routingKey1 = "user.#";
channel.queueBind(queueName1, exchangeName, routingKey1);
// 匹配一个词
String routingKey2 = "order.*";
channel.queueBind(queueName2, exchangeName, routingKey2);
// 7.创建消费者并消费
new Thread(() -> {
try {
handleConsumer(channel,queueName1);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
handleConsumer(channel,queueName2);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private static void handleConsumer(Channel channel, String queueName) throws Exception {
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
// 7.循环消费
System.err.println(queueName + " 消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println(queueName + "消费端消费: " + msg);
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.声明交换机
String exchangeName = "topicExchange";
// 5.发送消息
// 模糊匹配多个词
channel.basicPublish(exchangeName, "user.a", null, "user.a".getBytes());
channel.basicPublish(exchangeName, "user.a.b", null, "user.a.b".getBytes());
// 模糊匹配一个词
channel.basicPublish(exchangeName, "order.a", null, "order.a".getBytes());
channel.basicPublish(exchangeName, "order.a.b", null, "order.a.b".getBytes());
// 6.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
先启动消费者,然后启动生产者,查看控制台输出:
topicQueue2 消费端启动
topicQueue1 消费端启动
topicQueue1消费端消费: user.a
topicQueue1消费端消费: user.a.b
topicQueue2消费端消费: order.a
2.4 Headers Exchange
Headers Exchange(头交换机) 不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配。
在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。
不经常使用,了解即可。
三、Dead Letter Exchange(死信交换机)
3.1 死信模式
死信模式指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预,这个过程中的 exchange 和 queue 就是所谓的 Dead Letter Exchange 和 Queue。
3.2 死信消息生成原因
消息变成死信有以下几种情况:
- 消费者对消息使用了 **basicReject ** 或 者 basicNack 回复,并且 requeue 参数设置为 false,即不再将该消息重新在消费者间进行投递.
- 消息由于消息有效期(per-message TTL)过期
- 消息由于队列超过其长度限制而被丢弃
3.3 实例
死信消费者:
public class DeadLetterConsumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.声明死信交换机
String exchangeName = "deadLetterExchange";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName = "deadLetterQueue";
channel.queueDeclare(queueName, false, false, false, null);
// 6.将交换机和队列进行绑定关系
String routingKey = "#";
channel.queueBind(queueName, exchangeName, routingKey);
// 7.创建消费者消费
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
System.err.println("死信消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("死信消费端消费: " + msg);
}
}
}
业务消费者:
public class BusinessConsumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "topic";
// 4.声明交换机
String exchangeName = "businessExchange";
channel.exchangeDeclare(exchangeName, exchangeType, false, false, false, null);
// 5.申明队列
String queueName = "businessQueue";
Map<String, Object> arguments = new HashMap();
// 指定死信交换机名
arguments.put("x-dead-letter-exchange", "deadLetterExchange");
// 如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。
// 如果没有设置,则保留该消息原有的路由key
// arguments.put("x-dead-letter-routing-key", "business.#");
channel.queueDeclare(queueName, true, false, false, arguments);
// 5.将交换机和队列进行绑定关系
String routingKey = "business.#";
channel.queueBind(queueName, exchangeName, routingKey);
// 6.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// autoAck = false 设置手动确认消费
channel.basicConsume(queueName, false, queueingConsumer);
// 7.循环消费
System.err.println("业务消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("业务消费端接收消息: " + msg);
// 设置消费失败
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.循环发送消息
// 声明交换机
String exchangeName = "businessExchange";
// 声明路由
String routingKey = "business.order";
for (int i = 0; i < 5; i++) {
String msg = "business 消息" + i;
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
// 5.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
- 启动死信消费者
- 启动业务消费者
3)启动生产者 - 观察控制台输出
业务消费者控制台输出:
业务消费端启动
业务消费端接收消息: business 消息0
业务消费端接收消息: business 消息1
业务消费端接收消息: business 消息2
业务消费端接收消息: business 消息3
业务消费端接收消息: business 消息4
死信消费者控制台输出:
死信消费端启动
死信消费端消费: business 消息0
死信消费端消费: business 消息1
死信消费端消费: business 消息2
死信消费端消费: business 消息3
死信消费端消费: business 消息4
可以看到处理失败的消息已经传递到死信交换机中,并被死信消费者消费。
从上面的代码,我们可以知道死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型 Direct、Fanout、Topic。一般来说,会为每个业务队列分配一个独有的 路由key ,并对应的配置一个死信队列进行监听。
3.4 死信的处理方式
死信的产生是不可避免,我们需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
- 丢弃,如果不是很重要,可以选择丢弃
- 记录死信入库,然后做后续的业务分析或处理
- 通过死信队列,由负责监听死信的应用程序进行处理