死信队列
DLX(Dead-Letter-Exchange),可以称为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。
消息变成死信队列有下面几个情况:
- 消息被拒绝(channel.basicNack或channel.basicReject),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
在声明队列时,在队列参数属性中指定DLX,RabbitMQ就会将死信重新发布到指定的DLX上,从而被路由到死信队列中。通过监听这个队列的消息进行处理,并将消息的TTL设置为0配合使用可以弥补immediate
参数的功能。
-
通过在
channel.queueDeclare
方法添加DLX://创建DLX channel.exchangeDeclare("dlx_exchange", BuiltinExchangeType.DIRECT, true); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //为队列添加DLX channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments);
也可以为DLX指定路由键,如果没有指定,则使用原队列的路由键:
arguments.put("x-dead-letter-routing-key", DLX_QUEUE);
下面代码设置了TTL、DLX、DLK(DLX路由键):
public class Send { final static String NOREMAL_EXCHANGE = "normal_exchange"; final static String NOREMAL_QUEUE= "normal_queue"; final static String DLX_EXCHANGE= "dlx_exchange"; final static String DLX_QUEUE= "dlx_queue"; public static void main(String[] args) { Connection connection = null; Channel channel; try { connection = ConnectionUtils.getConnection(); channel = connection.createChannel(); //正常交换器 channel.exchangeDeclare(NOREMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true); //创建死信交换器 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true); //设置正常队列参数,添加死信队列 Map<String, Object> arguments = new HashMap<>(); //队列消息过期时间 arguments.put("x-message-ttl", 1000); //设置死信队列 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //设置DLX路由键,不设置,则使用原队列的路由键 arguments.put("x-dead-letter-routing-key", DLX_QUEUE); channel.queueDeclare(NOREMAL_QUEUE, true, false, false, arguments); channel.queueBind(NOREMAL_QUEUE, NOREMAL_EXCHANGE, NOREMAL_QUEUE); channel.queueDeclare(DLX_QUEUE, true, false, false, null); channel.queueBind(DLX_QUEUE,DLX_EXCHANGE,DLX_QUEUE); channel.basicPublish(NOREMAL_EXCHANGE, NOREMAL_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, "死信队列".getBytes("utf-8")); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
代码里创建了两个交换器
normal_exchange
和dlx_exchange
,分别绑定了两个队列normal_exchange
和dlx_exchange
。运行上面代码后,10秒后,在Web管理页面中,可以看到如下内容。在10秒消息过期后变成死信,消息发布到了死信队列。也可以看到第二列中的
D
、TTL
、DLX
标记,D
为持久化durable
。
延时队列
延时队列存储的对象是对应的延迟消息,延迟消息指当消息被发送到队列后,并不立即被消费者拿到消息,而是等待特定的时间后,消费者才能拿到消息。
延时队列应用场景有:
- 在订单系统中,用户下单后,通常有30分钟的付款时间。如果30分钟后没有付款,则订单被取消,这里可以使用延时队列处理订单。
- 用户通过远程控制家里的智能设备在指定时间进行工作,可以将用户指定发送到延时队列,到达时间后再推送到智能设备。
在RabbitMQ中,延时队列通过前面的DLX和TTL共同作用可以模拟出延迟队列功能。
和死信队列消费者监听正常队列不同,延时队列中消费者监听的是死信队列。当消费在设置DLX和TTL后,发送到队列中经过指定时候后变成死信,死信重新发送到死信队列,而消费者监听到死信队列中有消息而进行消费,这样就达到了消息的延时。
优先级队列、消息
优先级队列,具有优先级的队列具有高的优先权,优先级高的消息具有优先被消费的特权。
通过设置队列的x-max-priority
参数设置队列优先级,之后在发送消息时设置消费优先级:
public class Send {
final static String exchange = "priority_test";
public static void main(String[] args) {
Connection connection = null;
Channel channel;
try {
connection = ConnectionUtils.getConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//设置队列的优先级为10
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 10);
channel.queueDeclare(exchange, true, false, false, arguments);
channel.queueBind(exchange, exchange, exchange);
//设置消息的优先级
AMQP.BasicProperties build = new AMQP.BasicProperties
.Builder()
.priority(5) //设置消息的优先级为5,默认最低为0。最大不能超过队列优先级
.build();
channel.basicPublish(exchange, exchange, build, "队列、消息优先级".getBytes("utf-8"));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
运行上面代码,可以在Web管理页面看到优先级队列第二列有Pri
标志: