目录:
- 消息路由失败了会怎样
- 备份交换器
- TTL与DLX
- 如何实现延迟队列
- RabbitMQ的RPC实现
- 持久化
- 事务
- 发送方确认机制
消息路由失败了会怎样:
在RabbitMQ中,如果消息路由失败了,一般会有两种情况。要么是把消息回退给客户端处理,要么就把消息丢弃。
处理逻辑是根据basicPublish方法的mandatory和immediate两个参数来控制。
1、mandatory:当mandatory=true时,如果交换器无法根据自身类型和路由键匹配到符合条件的队列,便会调用Basic.Return命令将消息会推给生产者;当mandatory=false时,不满足条件则丢弃此条消息。
1 channel.addReturnListener(new ReturnListener() { 2 public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 3 AMQP.BasicProperties properties, byte[] body) throws IOException { 4 // 具体处理逻辑 5 } 6 });
2、immediate:当immediate=true时,交换器将消息路由到队列后,发现此队列上不存在任何消费者,那么这条消息将不会放入到队列中。当路由键匹配的所有队列都没有消费者时,改消息将会通过Basic.Return返回给生产者。
备份交换器:
备份交换器可以将未被路由到的消息存储在RabbitMQ中,在需要它的时候再去使用。
1 public class AlternateProduct { 2 3 private static final String EXCHANGE_NAME = "alternate.exchange"; 4 private static final String EXCHANGE_BAK_NAME = "alternate-bak.exchange"; 5 6 private static final String QUEUE_NAME = "alternate.queue"; 7 private static final String QUEUE_BAK_NAME = "alternate-bak.queue"; 8 9 private static final String ROUTING_KEY_NAME = "alternate.routing.key"; 10 11 public static void main(String[] args) throws IOException, TimeoutException { 12 Connection connection = RabbitMqUtils.getConnection(); 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, false, getExchangeDeclareArgs()); 16 // fanout类型,放款路由限制 17 channel.exchangeDeclare(EXCHANGE_BAK_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); 18 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 channel.queueDeclare(QUEUE_BAK_NAME, false, false, false, null); 21 22 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 23 // 因为交换器QUEUE_BAK_NAME设置fanout类型,所以可以不必关心路由键,故随便写可能将消息路由到对应的队列中 24 channel.queueBind(QUEUE_BAK_NAME, EXCHANGE_BAK_NAME, "123"); 25 26 // 发消息时路由键设置一个不存在的"",让其路由不到,从而把消息发到备份队列中 27 channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, 28 "alternate".getBytes()); 29 30 RabbitMqUtils.close(connection, channel); 31 } 32 33 private static Map<String, Object> getExchangeDeclareArgs() { 34 Map<String, Object> result = new HashMap<String, Object>(1); 35 result.put("alternate-exchange", EXCHANGE_BAK_NAME); 36 return result; 37 } 38 }
关于备份交换器的注意点:
1、如果备份交换器不存在,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。
2、如果备份交换器没有绑定任何队列,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。
3、如果备份交换器没有匹配到任何队列,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。
4、如果备份交换器和mandatory一起使用,且备份交换器有效,此时mandatory将无效。
TTL与DLX:
1、TTL:过期时间,有队列过期时间和消息过期时间。
)队列过期时间:
通过设置队列的过期时间,来使队列中所以的消息都具有过期时间。
)消息过期时间:
设置消息的BasicProperties props属性值来控制消息的过期时间。
1 AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder(); 2 // expiration单位ms 3 publishBuilder.expiration("10000"); 4 5 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(), 6 "ttl".getBytes());
对于第一种TTL来说,队列一但过期就会删除调;但对于第二种TTL来说,队列过期不会马上删除,而是等队列要被消费时再判断是否要删除。
那为什么会不一样呢,我们都知道mq对性能的要求是非常高的,如果第二种ddl的方式也要及时删除的话势必要扫描整个队列,这样的话,若队列长度较大是性能便会非常的差。
而第一种为什么可以做到及时删除呢,我们知道队列具有先进先出的特性,所以先入队的肯定要比后入队的要先过期,所以只要删除头部的就好啦。
而第二种的消息过期时间都是不固定的,考虑到MQ的性能,所以采用了上述的方式。
2、DLX:死信交换器,全称Dead Letter Exchange
变为死信队列的有以下几种情况:
- 消息被拒,且requeue=false
- 队列过期或队列达到最大长度
注意:DLX也是一个正常的交换器,和一般队列没有区别,它能在任何的队列上被指定。
如何实现延迟队列:
本模块讲述RabbitMQ,仅提供RabbitMQ的实现,大佬们有兴趣可以实现其它几种方式。
延迟队列是指将消息发送到队列后,等待一段时间后再进行消费。场景:饿了么外卖下单后,超过15分钟订单失效。
延迟队列场景的时间方式有四种:
1、DB轮询:通过job或其它逻辑将订单表的必要字段查出(如:orderId、createTime、status),当订单超过xx时间,将状态置为失效。
)优点:实现简单、无技术难点、异常恢复、支持分布式/集群环境
)缺点:影响DB性能、时效性查、效率低
2、JDK DelayQueue:java api提供的延迟队列的实现,通过poll()、take()方法获取超时任务
)优点:实现简单、性能较好
)缺点:异常恢复困难、分布式/集群实现困难(基于JVM内存)
3、Redis sortedSet:通过zset类型的score来实现
)优点:解耦、异常恢复、扩展性强、支持分布式/集群环境
)缺点:增加了redis维护成本、占用带宽
4、RabbitMQ TTL + DLX:使用RabbitMQ的过期时间和死信队列实现
)优点:解耦、异常恢复、扩展性强、支持分布式/集群环境
)缺点:增加了RabbitMQ维护成本、占用带宽
RabbitMQ的RPC实现:
RabbitMQ也可以实现RPC,客户端发送消息,服务端接收消息。
replayTo:设置回调队列,用于客户端响应服务端的回调消息。
correlationId:RPC请求和响应的关联id。
1 public class RpcServer { 2 3 private static final String QUEUE_NAME = "rpc.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 Connection connection = RabbitMqUtils.getRpcConnection(); 7 final Channel channel = connection.createChannel(); 8 // 创建请求处理队列,用于服务端接收客户端RPC请求 9 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 10 11 System.out.println("等待RPC请求..."); 12 13 // 服务端监听客户端发送的RPC请求 14 channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) { 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 17 throws IOException { 18 String correlationId = properties.getCorrelationId(); 19 String message = ""; 20 21 try { 22 message = new String(body); 23 System.err.println(format("service recv message:{0}, corrId:{1}", message, correlationId)); 24 } catch (Exception e) { 25 e.printStackTrace(); 26 } finally { 27 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() 28 .correlationId(correlationId) 29 .build(); 30 31 // 使用默认exchange,允许通过routingKey指定message将被发送给哪个queue 32 channel.basicPublish("", properties.getReplyTo(), props, (message + "--is done.").getBytes("UTF-8")); 33 channel.basicAck(envelope.getDeliveryTag(), false); 34 } 35 } 36 }); 37 } 38 }
1 public class RpcClient { 2 3 private static final String QUEUE_NAME = "rpc.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 6 final Connection connection = RabbitMqUtils.getConnection(); 7 Channel channel = connection.createChannel(); 8 9 // 随机创建corrId 10 final String collId = UUID.randomUUID().toString(); 11 // 客户端创建匿名队列,用于响应服务端请求 12 String callbackQueueName = channel.queueDeclare().getQueue(); 13 14 // 客户端发送消息;使用默认exchange(exchange=""),允许通过routingKey指定message将被发送给哪个queue 15 channel.basicPublish("", QUEUE_NAME, getBasicPublishProperties(collId, callbackQueueName), 16 "hello world".getBytes()); 17 // 客户端接收服务端响应的消息 18 channel.basicConsume(callbackQueueName, new DefaultConsumer(channel) { 19 @Override 20 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 if (collId.equals(properties.getCorrelationId())) { 22 System.out.println(format("client recv message:{0}, corrId:{1}", new String(body), collId)); 23 } else { 24 System.out.println("不是本次请求的消息"); 25 } 26 } 27 }); 28 29 TimeUnit.SECONDS.sleep(1); 30 31 RabbitMqUtils.close(connection, channel); 32 } 33 34 private static AMQP.BasicProperties getBasicPublishProperties(String corrId, String callbackQueueName) { 35 return new AMQP.BasicProperties().builder() 36 .correlationId(corrId) 37 .replyTo(callbackQueueName).build(); 38 } 39 }
持久化:
在RabbitMQ中交换器、队列、消息都设置为持久化就能保持消息不丢失了嘛?
当然不,情况如下:
1、当autoAck设置为true的时候,消费者接收到消息后还没来得及处理就宕机了。
解决:autoAck设为false,消费者处理完消息后再通知服务端删除消息。
2、再RabbitMQ持久化到磁盘中的这段时间,RabbitMQ服务器宕机了。
解决:服务端确认机制、镜像队列(后面章节会描述)。
事务:
1、开启事务:channel.txSelect()
2、提交事务:channel.txCommit()
3、回滚事务:channel.txRollback()
事务和db的事务很相似,不细说。
发送方确认机制:
AMQP协议提供了事务机制来保证消息能真正成功的到达RabbitMQ,但事务机制会严重的影响到RabbitMQ的吞吐量,所以RabbitMQ引入了一种轻量的方式,发送方确认机制。
客户端使用方式:
1、将信道设置发送方确认方式:channel.confirmSelect()。
2、确认消息是否发送成功:
)boolean waitForConfirms() throws InterruptedException;
)boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
)void waitForConfirmsOrDie() throws IOException, InterruptedException;
)void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
发送方确认消息成功的三种方式:
1 public class PublisherConfirmProduct { 2 3 private static final String EXCHANGE_NAME = "demo.exchange"; 4 private static final String ROUTING_KEY = "demo.routingkey"; 5 private static final String QUEUE_NAME = "demo.queue"; 6 private static final String MESSAGE = "Hello World!"; 7 8 /** 9 * 单条确认 10 */ 11 public static void commonConfirm() throws Exception { 12 Connection connection = RabbitMqUtils.getConnection(); 13 Channel channel = initChannel(connection); 14 15 channel.confirmSelect(); 16 for (int i = 0; i < 100; i++) { 17 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 18 if (channel.waitForConfirms()) { 19 // 逐条确认是否发送成功 20 System.out.println("send success!"); 21 } 22 } 23 24 RabbitMqUtils.close(connection, channel); 25 } 26 27 /** 28 * 批量确认 29 */ 30 public static void batchConfirm() throws Exception { 31 Connection connection = RabbitMqUtils.getConnection(); 32 Channel channel = initChannel(connection); 33 34 channel.confirmSelect(); 35 for (int i = 0; i < 100; i++) { 36 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 37 } 38 39 // 批量确认是否发送成功,如果某一次确认失败这一批都要重新发送 40 if (channel.waitForConfirms()) { 41 System.out.println("send success!"); 42 } 43 44 RabbitMqUtils.close(connection, channel); 45 } 46 47 /** 48 * 异步确认 49 */ 50 public static void asyncConfirm() throws Exception { 51 Connection connection = RabbitMqUtils.getConnection(); 52 Channel channel = initChannel(connection); 53 channel.basicQos(1); 54 55 channel.confirmSelect(); 56 57 // 定义一个未确认消息集合 58 final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>()); 59 for (int i = 0; i < 100; i++) { 60 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 61 unConfirmSet.add(channel.getNextPublishSeqNo()); 62 } 63 64 channel.addConfirmListener(new ConfirmListener() { 65 @Override 66 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 67 System.err.println(format("拒绝消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); 68 } 69 70 @Override 71 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 72 System.err.println(format("确认消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); 73 if (multiple) { 74 // multiple为true,则deliveryTag之前的所有消息全部被确认 75 unConfirmSet.headSet(deliveryTag + 1).clear(); 76 } else { 77 // 否则只确认一条消息 78 unConfirmSet.remove(deliveryTag); 79 } 80 } 81 }); 82 83 TimeUnit.SECONDS.sleep(5); 84 System.out.println(unConfirmSet.size()); 85 86 RabbitMqUtils.close(connection, channel); 87 } 88 89 private static Channel initChannel(Connection connection) throws IOException { 90 Channel channel = connection.createChannel(); 91 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); 92 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 93 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); 94 return channel; 95 } 96 97 public static void main(String[] args) throws Exception { 98 // commonConfirm(); 99 // batchConfirm(); 100 asyncConfirm(); 101 } 102 }