zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记(四、RabbitMQ队列)

    目录:

    • 消息路由失败了会怎样
    • 备份交换器
    • TTL与DLX
    • 如何实现延迟队列
    • RabbitMQ的RPC实现
    • 持久化
    • 事务
    • 发送方确认机制

    消息路由失败了会怎样:

    在RabbitMQ中,如果消息路由失败了,一般会有两种情况。要么是把消息回退给客户端处理,要么就把消息丢弃。

    处理逻辑是根据basicPublish方法的mandatoryimmediate两个参数来控制。

    1、mandatory:当mandatory=true时,如果交换器无法根据自身类型和路由键匹配到符合条件的队列,便会调用Basic.Return命令将消息会推给生产者;当mandatory=false时,不满足条件则丢弃此条消息。

    1 channel.addReturnListener(new ReturnListener() {
    2     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
    3                              AMQP.BasicProperties properties, byte[] body) throws IOException {
    4         // 具体处理逻辑
    5     }
    6 });

    2、immediate:当immediate=true时,交换器将消息路由到队列后,发现此队列上不存在任何消费者,那么这条消息将不会放入到队列中。当路由键匹配的所有队列都没有消费者时,改消息将会通过Basic.Return返回给生产者。

    备份交换器:

    备份交换器可以将未被路由到的消息存储在RabbitMQ中,在需要它的时候再去使用。

     1 public class AlternateProduct {
     2 
     3     private static final String EXCHANGE_NAME = "alternate.exchange";
     4     private static final String EXCHANGE_BAK_NAME = "alternate-bak.exchange";
     5     
     6     private static final String QUEUE_NAME = "alternate.queue";
     7     private static final String QUEUE_BAK_NAME = "alternate-bak.queue";
     8 
     9     private static final String ROUTING_KEY_NAME = "alternate.routing.key";
    10 
    11     public static void main(String[] args) throws IOException, TimeoutException {
    12         Connection connection = RabbitMqUtils.getConnection();
    13         Channel channel = connection.createChannel();
    14 
    15         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, false, getExchangeDeclareArgs());
    16         // fanout类型,放款路由限制
    17         channel.exchangeDeclare(EXCHANGE_BAK_NAME, BuiltinExchangeType.FANOUT, false, false, false, null);
    18 
    19         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    20         channel.queueDeclare(QUEUE_BAK_NAME, false, false, false, null);
    21 
    22         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME);
    23         // 因为交换器QUEUE_BAK_NAME设置fanout类型,所以可以不必关心路由键,故随便写可能将消息路由到对应的队列中
    24         channel.queueBind(QUEUE_BAK_NAME, EXCHANGE_BAK_NAME, "123");
    25 
    26         // 发消息时路由键设置一个不存在的"",让其路由不到,从而把消息发到备份队列中
    27         channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN,
    28                 "alternate".getBytes());
    29 
    30         RabbitMqUtils.close(connection, channel);
    31     }
    32 
    33     private static Map<String, Object> getExchangeDeclareArgs() {
    34         Map<String, Object> result = new HashMap<String, Object>(1);
    35         result.put("alternate-exchange", EXCHANGE_BAK_NAME);
    36         return result;
    37     }
    38 }

    关于备份交换器的注意点:

    1、如果备份交换器不存在,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。

    2、如果备份交换器没有绑定任何队列,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。

    3、如果备份交换器没有匹配到任何队列,客户端和RabbitMQ客户端都不会出现异常,但是消息会丢失。

    4、如果备份交换器和mandatory一起使用,且备份交换器有效,此时mandatory将无效。

    TTL与DLX:

    1、TTL:过期时间,有队列过期时间消息过期时间

    队列过期时间

    通过设置队列的过期时间,来使队列中所以的消息都具有过期时间。

    消息过期时间

    设置消息的BasicProperties props属性值来控制消息的过期时间。

    1 AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder();
    2 // expiration单位ms
    3 publishBuilder.expiration("10000");
    4 
    5 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(),
    6         "ttl".getBytes());

    对于第一种TTL来说,队列一但过期就会删除调;但对于第二种TTL来说,队列过期不会马上删除,而是等队列要被消费时再判断是否要删除。

    那为什么会不一样呢,我们都知道mq对性能的要求是非常高的,如果第二种ddl的方式也要及时删除的话势必要扫描整个队列,这样的话,若队列长度较大是性能便会非常的差。

    而第一种为什么可以做到及时删除呢,我们知道队列具有先进先出的特性,所以先入队的肯定要比后入队的要先过期,所以只要删除头部的就好啦。

    而第二种的消息过期时间都是不固定的,考虑到MQ的性能,所以采用了上述的方式。

    2、DLX:死信交换器,全称Dead Letter Exchange

    变为死信队列的有以下几种情况:

    • 消息被拒,且requeue=false
    • 队列过期或队列达到最大长度

    注意:DLX也是一个正常的交换器,和一般队列没有区别,它能在任何的队列上被指定。

    如何实现延迟队列:

    本模块讲述RabbitMQ,仅提供RabbitMQ的实现,大佬们有兴趣可以实现其它几种方式。

    延迟队列是指将消息发送到队列后,等待一段时间后再进行消费。场景:饿了么外卖下单后,超过15分钟订单失效。

    延迟队列场景的时间方式有四种:

    1、DB轮询:通过job或其它逻辑将订单表的必要字段查出(如:orderId、createTime、status),当订单超过xx时间,将状态置为失效。

    )优点:实现简单、无技术难点、异常恢复、支持分布式/集群环境

    )缺点:影响DB性能、时效性查、效率低

    2、JDK DelayQueue:java api提供的延迟队列的实现,通过poll()、take()方法获取超时任务

    )优点:实现简单、性能较好

    )缺点:异常恢复困难、分布式/集群实现困难(基于JVM内存)

    3、Redis sortedSet:通过zset类型的score来实现

    )优点:解耦、异常恢复、扩展性强、支持分布式/集群环境

    )缺点:增加了redis维护成本、占用带宽

    4、RabbitMQ TTL + DLX:使用RabbitMQ的过期时间和死信队列实现

     实现:delay-message

    )优点:解耦、异常恢复、扩展性强、支持分布式/集群环境

    )缺点:增加了RabbitMQ维护成本、占用带宽

    RabbitMQ的RPC实现:

    RabbitMQ也可以实现RPC,客户端发送消息,服务端接收消息。

    replayTo:设置回调队列,用于客户端响应服务端的回调消息。

    correlationId:RPC请求和响应的关联id。

     1 public class RpcServer {
     2 
     3     private static final String QUEUE_NAME = "rpc.queue";
     4 
     5     public static void main(String[] args) throws IOException, TimeoutException {
     6         Connection connection = RabbitMqUtils.getRpcConnection();
     7         final Channel channel = connection.createChannel();
     8         // 创建请求处理队列,用于服务端接收客户端RPC请求
     9         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    10 
    11         System.out.println("等待RPC请求...");
    12 
    13         // 服务端监听客户端发送的RPC请求
    14         channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) {
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
    17                     throws IOException {
    18                 String correlationId = properties.getCorrelationId();
    19                 String message = "";
    20 
    21                 try {
    22                     message = new String(body);
    23                     System.err.println(format("service recv message:{0}, corrId:{1}", message, correlationId));
    24                 } catch (Exception e) {
    25                     e.printStackTrace();
    26                 } finally {
    27                     AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    28                             .correlationId(correlationId)
    29                             .build();
    30 
    31                     // 使用默认exchange,允许通过routingKey指定message将被发送给哪个queue
    32                     channel.basicPublish("", properties.getReplyTo(), props, (message + "--is done.").getBytes("UTF-8"));
    33                     channel.basicAck(envelope.getDeliveryTag(), false);
    34                 }
    35             }
    36         });
    37     }
    38 }
     1 public class RpcClient {
     2 
     3     private static final String QUEUE_NAME = "rpc.queue";
     4 
     5     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
     6         final Connection connection = RabbitMqUtils.getConnection();
     7         Channel channel = connection.createChannel();
     8 
     9         // 随机创建corrId
    10         final String collId = UUID.randomUUID().toString();
    11         // 客户端创建匿名队列,用于响应服务端请求
    12         String callbackQueueName = channel.queueDeclare().getQueue();
    13 
    14         // 客户端发送消息;使用默认exchange(exchange=""),允许通过routingKey指定message将被发送给哪个queue
    15         channel.basicPublish("", QUEUE_NAME, getBasicPublishProperties(collId, callbackQueueName),
    16                 "hello world".getBytes());
    17         // 客户端接收服务端响应的消息
    18         channel.basicConsume(callbackQueueName, new DefaultConsumer(channel) {
    19             @Override
    20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    21                 if (collId.equals(properties.getCorrelationId())) {
    22                     System.out.println(format("client recv message:{0}, corrId:{1}", new String(body), collId));
    23                 } else {
    24                     System.out.println("不是本次请求的消息");
    25                 }
    26             }
    27         });
    28 
    29         TimeUnit.SECONDS.sleep(1);
    30 
    31         RabbitMqUtils.close(connection, channel);
    32     }
    33 
    34     private static AMQP.BasicProperties getBasicPublishProperties(String corrId, String callbackQueueName) {
    35         return new AMQP.BasicProperties().builder()
    36                 .correlationId(corrId)
    37                 .replyTo(callbackQueueName).build();
    38     }
    39 }

    持久化:

    在RabbitMQ中交换器、队列、消息都设置为持久化就能保持消息不丢失了嘛?

    当然不,情况如下:

    1、当autoAck设置为true的时候,消费者接收到消息后还没来得及处理就宕机了。

    解决:autoAck设为false,消费者处理完消息后再通知服务端删除消息。

    2、再RabbitMQ持久化到磁盘中的这段时间,RabbitMQ服务器宕机了。

    解决:服务端确认机制、镜像队列(后面章节会描述)。

    事务:

    1、开启事务:channel.txSelect()

    2、提交事务:channel.txCommit()

    3、回滚事务:channel.txRollback()

    事务和db的事务很相似,不细说。

    发送方确认机制:

    AMQP协议提供了事务机制来保证消息能真正成功的到达RabbitMQ,但事务机制会严重的影响到RabbitMQ的吞吐量,所以RabbitMQ引入了一种轻量的方式,发送方确认机制。

    客户端使用方式:

    1、将信道设置发送方确认方式:channel.confirmSelect()。

    2、确认消息是否发送成功

    )boolean waitForConfirms() throws InterruptedException;

    )boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

    )void waitForConfirmsOrDie() throws IOException, InterruptedException;

    )void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

    发送方确认消息成功的三种方式:

      1 public class PublisherConfirmProduct {
      2 
      3     private static final String EXCHANGE_NAME = "demo.exchange";
      4     private static final String ROUTING_KEY = "demo.routingkey";
      5     private static final String QUEUE_NAME = "demo.queue";
      6     private static final String MESSAGE = "Hello World!";
      7 
      8     /**
      9      * 单条确认
     10      */
     11     public static void commonConfirm() throws Exception {
     12         Connection connection = RabbitMqUtils.getConnection();
     13         Channel channel = initChannel(connection);
     14 
     15         channel.confirmSelect();
     16         for (int i = 0; i < 100; i++) {
     17             channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());
     18             if (channel.waitForConfirms()) {
     19                 // 逐条确认是否发送成功
     20                 System.out.println("send success!");
     21             }
     22         }
     23 
     24         RabbitMqUtils.close(connection, channel);
     25     }
     26 
     27     /**
     28      * 批量确认
     29      */
     30     public static void batchConfirm() throws Exception {
     31         Connection connection = RabbitMqUtils.getConnection();
     32         Channel channel = initChannel(connection);
     33 
     34         channel.confirmSelect();
     35         for (int i = 0; i < 100; i++) {
     36             channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());
     37         }
     38 
     39         // 批量确认是否发送成功,如果某一次确认失败这一批都要重新发送
     40         if (channel.waitForConfirms()) {
     41             System.out.println("send success!");
     42         }
     43 
     44         RabbitMqUtils.close(connection, channel);
     45     }
     46 
     47     /**
     48      * 异步确认
     49      */
     50     public static void asyncConfirm() throws Exception {
     51         Connection connection = RabbitMqUtils.getConnection();
     52         Channel channel = initChannel(connection);
     53         channel.basicQos(1);
     54 
     55         channel.confirmSelect();
     56 
     57         // 定义一个未确认消息集合
     58         final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>());
     59         for (int i = 0; i < 100; i++) {
     60             channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());
     61             unConfirmSet.add(channel.getNextPublishSeqNo());
     62         }
     63 
     64         channel.addConfirmListener(new ConfirmListener() {
     65             @Override
     66             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
     67                 System.err.println(format("拒绝消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));
     68             }
     69 
     70             @Override
     71             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
     72                 System.err.println(format("确认消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));
     73                 if (multiple) {
     74                     // multiple为true,则deliveryTag之前的所有消息全部被确认
     75                     unConfirmSet.headSet(deliveryTag + 1).clear();
     76                 } else {
     77                     // 否则只确认一条消息
     78                     unConfirmSet.remove(deliveryTag);
     79                 }
     80             }
     81         });
     82 
     83         TimeUnit.SECONDS.sleep(5);
     84         System.out.println(unConfirmSet.size());
     85 
     86         RabbitMqUtils.close(connection, channel);
     87     }
     88 
     89     private static Channel initChannel(Connection connection) throws IOException {
     90         Channel channel = connection.createChannel();
     91         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
     92         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
     93         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
     94         return channel;
     95     }
     96 
     97     public static void main(String[] args) throws Exception {
     98 //        commonConfirm();
     99 //        batchConfirm();
    100         asyncConfirm();
    101     }
    102 }
  • 相关阅读:
    解读tensorflow之rnn 的示例 ptb_word_lm.py
    tensorflow 的rnn的示例 ptb_word_lm.py 的完整代码
    python中decorator的用法及原理(一)
    android 6 (API 23) 及更高版本 面向 NDK 开发者的 Android 变更
    GCC选项_-Wl,-soname 及 DT_NEEDED 的解释
    一万小时天才理论
    好好说话!
    如何打造你的独立观点
    整理的艺术3
    读过的书记不住怎么办?
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/11838869.html
Copyright © 2011-2022 走看看