zoukankan      html  css  js  c++  java
  • RabbitMQ 第四课 RabbitMQ的高级特性

     

    一、消息如何保障100%的投递成功?

    1. 什么是生产端的可靠性投递?

    (1)保障消息的成功发出

    (2)保障MQ节点的成功接收

    (3)发送端收到MQ节点(Broker)确认应答

    (4)完善的消息进行补偿机制

    2. 生产端 - 可靠性投递(BAT/TMD 互联网大厂的解决方案)

    (1)消息落库,对消息状态进行打标(思考:在高并发的场景下是否适合?)

     (2)消息的延迟投递,做二次确认,回调检查(节省了数据落库的这一步)

      

    二、幂等性概念

    一句话概括:可能你要对一件事情进行操作100次,1000次,结果都是相同的。比如 对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1 where version = 1

    1. 消费端 - 幂等性保障

    在海量订单产生的业务高峰期,如何避免消息的重复消费问题
    1. 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。

    业界主流的幂等性操作解决方案:
    (1)唯一Id + 指纹码 机制,利用数据库主键去重
      1)select count(1) from T_order where id = 唯一ID + 指纹码
      2)好处:实现简单
      3)坏处:高并发下有数据库写入的性能瓶颈
      4)解决方案:跟进ID进行分库分表进行算法路由
    (2)利用Redis的原子性取实现
      1)第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
      2)第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。

    三、 Confirm确认

    (1)消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们产生一个应答。

    (2)生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。

    如何实现Confirm确认消息?

    第一步:在channel上开启确认模式:channel.confirmSelect() 

    第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
    
        //交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null);
            //指定消息投递模式为:消息的确认模式
            channel.confirmSelect();
            //设置confirm返回监听
            channel.addConfirmListener(new ConfirmListener() {
                //1.处理失败场景,deliveryTag:消息的唯一标签
                //失败场景:磁盘写满了,队列数达到上限了mq出问题了等
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("-----------no ack------------" + deliveryTag + " | " + multiple);
                }
                //2.处理成功场景,deliveryTag:消息的唯一标签
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("-----------ack------------" + deliveryTag + " | " + multiple);
                }
                //3.第三种情况,Ack 和No Ack都没有收到,这就需要可靠性投递来解决。假设Broker端返回的确认突然出现网络的闪断,
                //那我连ACK到底成功还是失败都不知道,那怎么办呢?用定时任务取抓取一些中间状态的消息,然后重新触发发送,补偿。
            });
    
            String message = "发送一条需要确认的消息!!!!";
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("生产者 send :"+message);
            channel.close();
            connection.close();
            System.out.println("over");
        }
    }
    Producer.java(Confirm机制)

    四、Return消息机制

    1. Return Listener用于处理一些不可路由的消息

    2. 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。

    3. 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

    4. 在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker会自动删除该消息。

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ReturnListener;
    
    public class Producer {
        
        private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm";//交换机名称
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null);
            //设置消息发送后匹配失败时的处理方法
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode,String replyText,String exchange,
                                    String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {
                    System.out.println("-----------ReturnListener start ------------");
                    System.out.println(replyCode + " | " + replyText  + " | " +  exchange  + " | " +  routingKey +  " | " +  properties + " | " + new String(body));
                    System.out.println("-----------ReturnListener end ------------");
                }
            });
            String message = "发送一条需要确认的消息!!!!";
            
            boolean mandatory = true; //[ˈmændətəri] adj.强制的; 法定的; 义务的; n.    受托者;   
            channel.basicPublish(EXCHANGE_NAME, "", mandatory, null, message.getBytes());
            System.out.println("生产者 send :"+message);
            channel.close();
            connection.close();
            System.out.println("over");
        }
    }
    Producer.java(Return机制)

      

    五、消费端限流

      假设一个场景,首先,我们Rabbitmq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,就会出现如下情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同事处理这么多数据。

      RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置QoS的值)未被确认前,不尽兴消费新的消息。

    限流API

    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

    参数讲解:

    prefetchSize:消息的最大大小,比如5M,0为不限制;

    prefetchCount:最多发送多少条消息,实际工作中设置为1。会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,

    即一旦有N个消息还没有ack,则该consumer将block(阻塞)掉,直到有消息ack.

    global:是否将上面设置应用于channel级别还是consumer级别。true:在channel通道级别做限制;false:在consumer级别做限制


    注意:
    prefetchSize和global这两项,Rabbitmq没有实现,暂且不研究,一个设置为0,一个设置为false就好了。
    prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。(必须手动签收,不能自动签收)

    //同一时刻服务器只会发一条数据给消费者
            channel.basicQos(0, 1, false);
    //        channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String message = new String(body,"UTF-8");
                    System.out.println("收到消息:"+message);
                    
                    boolean multiple = false; //是否批量签收
                    //这个方法会主动回送给Broker一个应答,表示这条消息我已经处理完了,你可以再给我下一条了
                    channel.basicAck(envelope.getDeliveryTag(),multiple); 
                }
            };
            boolean autoAck = false; //不自动确认
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    注意:
    如果 channel.basicAck(envelope.getDeliveryTag(),multiple); 代码被注释掉,则收到一条消息后则没法再继续处理了下一条消息了。
    比如生产者发了5个,处理完第一个后,就卡在这了,其余4条没有继续处理。


    六、消息的ACK(手动确认)与NACK(重回队列)

    消费端的手工ACK和NACK
    手工ACK:消费成功了,向发起者确认
    NACK:消费失败,让生产者重新发
     消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
    如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。

    消费端的重回队列
    消费端重回队列是为了对没有处理成功的消息,把消息重新回递给Broker!
    一般我们在实际应用中,都会关闭重回队列,也就是设置为false
    boolean autoAck = false; //不自动确认
    channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
    
    
    
    /**
     * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了
     * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可
     */
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        
         private static final String EXCHANGE_NAME = "test_exchange_topic";
         private final static String ROUTING_KEY_NAME = "order.update";
         
         public static void main(String[] args) throws IOException, TimeoutException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             //声明交换机
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             //发送五条消息,属性参数properties
             for (int i = 0; i < 3; i++) {
                //【设置属性参数】:AMQP.BasicProperties() start
                 Map<String,Object> headers = new HashMap<>();
                 headers.put("id", i);
                 headers.put("name", "xiaochao");
                 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                         .contentEncoding("UTF-8")
                         .deliveryMode(2)
                         .expiration("10000")
                         .headers(headers)
                         .build();
                 String message = "匹配insert" + i;
                 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes());
             }
             channel.close();
             connection.close();
             System.out.println("game over");
         }
     }
    Producer.java
    
    
    import java.io.IOException;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
     public class Consumer1 {
         
         private static final String EXCHANGE_NAME = "test_exchange_topic";
         private  static final String QUEUE_NAME = "test_queue_topic_1";
         private final static String ROUTING_KEY_NAME = "order.*";//order.#
     
         public static void main(String[] args) throws IOException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME);
             //channel.basicQos(1); //允许一次多个消息进到队列里来
             Consumer consumer = new DefaultConsumer(channel) {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                     throws IOException {
                     super.handleDelivery(consumerTag, envelope, properties, body);
                     System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8"));
                     if(properties.getHeaders().get("id").equals(1)) {
                         try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                         channel.basicNack(envelope.getDeliveryTag(), false, true);
                     } else {
                         channel.basicAck(envelope.getDeliveryTag(), false);
                     }
                 }
             };
             channel.basicConsume(QUEUE_NAME,false,consumer);
         }
     }
    Consumer.java
    返回结果:
    0 | 匹配insert0
    1 | 匹配insert1
    2 | 匹配insert2  (由于没有限制basicQos(1):允许一次多个消息进来,所以会先消费所有队列里的,然后)
    1 | 匹配insert1
    1 | 匹配insert1
    1 | 匹配insert1
    1 | 匹配insert1

    id为1的重发了4次,总共5次,
    如果将channel.basicQos(1);放开,每次只允许一个进来,那么结果如下:
    0 | 匹配insert0
    1 | 匹配insert1
    1 | 匹配insert1(由于basicQos(1):一次只允许一个消息进来,所以1会一直重发)
    1 | 匹配insert1
    1 | 匹配insert1
    1 | 匹配insert1
    
    

    七、TTL队列 / 消息(Time To Live)生存时间

    1. RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

    2. RabbitMQ支持队列的过期时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动地被删除

    界面操作步骤:

    1. 创建队列并设置超时时间

    2. 创建Exchange 

    3.  Exchange上绑定路由

     4. 在能被路由到的队列里也可以看到刚刚由Exchange里创建的绑定

     5. Exchange中发消息

     6. 在队列里看到了这条消息

    7. 10秒后消息消失

     

    8. 发消息时可以设定超时属性:

     AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .expiration("10000").build();

    八、死信队列(DLX:Dead-Letter-Exchange)

     0. 何为死信、死信队列

    死信:当一个消息没有消费者取消费,此消息就是死信了。任何MQ中都有死信的概念。
    利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

    1. 消息变成死信的几种情况

    1. 消息被绝(basic.reject / basic.nack) 并且requeue=false:不需要重回队列了
    2. 消息TTL过期
    3. 队列达到最大长度

      DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
    当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
    可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。


    2. 死信队列的具体实现实战

    正常声明交换机、队列、绑定:
    (1)Exchange:dlx.excahnge
    (2)Queue:dlx.queue
    (3)RoutingKey: #
    (4)队列上加参数:argumentgs.put("x-dead-letter-exchange","dlx.exchange");


    启动消费端,创建交换机和队列以及它们之间的绑定关系,然后关闭消费者端程序,使其无法接收并处理队列信息 
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
     public class Consumer1 {
         
         private static final String EXCHANGE_NAME = "test_dlx_exchange";
         private  static final String QUEUE_NAME = "test_dlx_queue";
         private final static String ROUTING_KEY_NAME = "dlx.#";
     
         public static void main(String[] args) throws IOException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             
             //1. 声明(创建)正常接收消息的队列
             Map<String,Object> arguments = new HashMap<>();
             arguments.put("x-dead-letter-exchange", "dlx.exchange");  //设置死信队列参数:交换机名称
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); //本队列绑定死信队
             channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME);
             channel.basicQos(1); //允许一次多个消息进到队列里来
             
             //2. 声明(创建)死信队列
             channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
             channel.queueDeclare("dlx.queue", true, false, false,null);
             channel.queueBind("dlx.queue", "dlx.exchange", "#");
             
             Consumer consumer = new DefaultConsumer(channel) {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                     throws IOException {
                     super.handleDelivery(consumerTag, envelope, properties, body);
                     System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8"));
                     boolean multiple = false;
                     if(properties.getHeaders().get("id").equals(1)) {
                         try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        boolean requeue = false; //重回队列
                        channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
                     } else {
                         channel.basicAck(envelope.getDeliveryTag(), multiple); 
                     }
                 }
             };
             boolean autoAck = false; //是否自动确认,设置为否
             channel.basicConsume(QUEUE_NAME,autoAck,consumer);
         }
     }
    Consumer1.java(队列绑定死信队列)
    启动消息发送者程序 
    /**
     * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了
     * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可
     */
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        
         private static final String EXCHANGE_NAME = "test_dlx_exchange";
         private final static String ROUTING_KEY_NAME = "dlx.save";
         
         public static void main(String[] args) throws IOException, TimeoutException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             //声明交换机
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             //发送五条消息,属性参数properties
             for (int i = 0; i < 1; i++) {
                //【设置属性参数】:AMQP.BasicProperties() start
                 Map<String,Object> headers = new HashMap<>();
                 headers.put("id", i);
                 headers.put("name", "xiaochao");
                 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                         .contentEncoding("UTF-8")
                         .deliveryMode(2)
                         .expiration("10000")
                         .headers(headers)
                         .build();
                 String message = "hello DLX message" + i;
                 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes());
             }
             channel.close();
             connection.close();
             System.out.println("game over");
         }
     }
    Producer.java(设置过期时间)

    界面端现象:
    1.启动消费者端程序,然后暂停消费者端程序,再启动生产者发生消息,消息首先发送到了test_dlx_queue

    2. 由于消费端程序被关闭了,且消息又设置了过期时间,所以10s后RabbitMQ将消息转至死信队列
     
  • 相关阅读:
    MySQL锁系列3 MDL锁
    MySQL锁系列2 表锁
    MySQL锁系列1
    MySQL open table
    MySQL优化器join顺序
    MySQL优化器cost计算
    MySQL源码 优化器
    MySQL源码 解析器
    MySQL源码 数据结构hash
    微信小程序爬坑日记
  • 原文地址:https://www.cnblogs.com/guchunchao/p/13139428.html
Copyright © 2011-2022 走看看