zoukankan      html  css  js  c++  java
  • 8、RabbitMQ-消息的确认机制(生产者)

    RabbitMQ 之消息确认机制(事务+Confirm)

    https://blog.csdn.net/u013256816/article/details/55515234

    概述:

    在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题
     
    除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正
    确到达 Rabbit 服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器
    不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;
    导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!
    RabbitMQ 为我们提供了两种方式:
    1. 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;
    2. 通过将 channel 设置成 confirm 模式来实现
    事务机制                                                                                                                                                                          RabbitMQ 中与事务机制有关的方法有三个:txSelect(), txCommit()以及 txRollback(), 
    txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,
    txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息
    给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,
    如果在 txCommit执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候
    我们便可以捕获异常通过 txRollback 回滚事务了。                                                                                                                                 
    txSelect   txCommit  txRollback                                                                                                                                txSelect:用户将当前channel设置成transation模式                                                                                                          txCommit :用于提交事务
    txRollback :用户回滚事务

    生产者:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
         private static final String QUEUE_NAMW = "test_tx_queue";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               String msg = "tx";
               
               try {
                    //开启事务模式、
                    channel.txSelect();
                    channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                    //模拟事故
                    int i = 1/0;
                    //提交
                    channel.txCommit();
               } catch (Exception e) {
              //进行事务回滚 channel.txRollback(); System.
    out.println("TxRollback..."); } channel.close(); conn.close(); } }

      消费者:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    public class TxReceive {
         
         private static final String QUEUE_NAMW = "test_tx_queue";
    public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //队列声明
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               channel.basicQos(1);
               
               //绑定队列到交换机转发器
               
               //channel.queueBind(QUEUE_NAMW, "", "");
               
                         //定义一个消费者
                         Consumer consumer = new  DefaultConsumer(channel){
                               //收到消息就会触发这个方法
                               @Override
                               public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                         throws IOException {
                                    String msg = new  String(body,"utf-8");
                                    System.out.println("消费者1接收到的消息" + msg);
                                    
                                    try {
                                         Thread.sleep(1500);
                                    } catch (InterruptedException e)  {
                                         e.printStackTrace();
                                    }finally{
                                         System.out.println("消费者1处理完成!");
                                         //手动回执
                                         channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                               }
                         };
                         //监听队列
                         //自动应答false
                         boolean autoAck = false;
                         channel.basicConsume(QUEUE_NAMW, autoAck,  consumer);
         }
    }

     此时消费者不会接收到消息

     

    此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量   

     Confirm模式 

    概述 

    上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随
    后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低
    RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。 

      producer 端 confirm 模式的实现原理 

     

       该模式最大的好处就是异步的!!!     

    开启 confirm 模式的方法                                                                                                                     已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。                生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式                                                            核心代码:

    //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
    channel.confirmSelect();     

     编程模式

    1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端
        confirm。实际上是一种串行 confirm 了。
    2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端
        confirm。
    3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回
        调这个方法。

     普通模式:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class confirm{
         private static final String QUEUE_NAMW =  "test_tx_confirm1";
         
         public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               //生产者调用confirmSelect,将channel设置为confirm模式
               channel.confirmSelect();
               String msg = "confirm";
               channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
               if(!channel.waitForConfirms()){
                    System.out.println("send failed");
               }else{
                    System.out.println("send ok");
               }
               channel.close();
               conn.close();
         }
    }

    批量模式
    批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)
    或者定量(达到多少条)或者两则结合起来publish 消息,然后等待
    服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm
     效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情
    况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数
    量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
    private static final String QUEUE_NAMW = "test_tx_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
          Connection conn = ConnectionUtils.getConnection();
          Channel channel = conn.createChannel();

          channel.queueDeclare(QUEUE_NAMW, false, false, false, null);

          //1
          //生产者调用confirmSelect,将channel设置为confirm模式
          channel.confirmSelect();

          //2
          String msg = "confirm";
          //批量发送
          for(int i=1;i<=10;i++){
            channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
          }

          //3
          //确认
          if(!channel.waitForConfirms()){
            System.out.println("send failed");
          }else{
            System.out.println("send ok");
          }

          channel.close();
          conn.close();
    }
    }

    异步模式
    Channel 对象提供的 ConfirmListener()回调方法只包含 deliveryTag
    (当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel 
    维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中
    元素加 1,每回调一次 handleAck方法,unconfirm 集合删掉相应的
    一条(multiple=false)或多条(multiple=true)记录。从程序运行
    效率上看,这个unconfirm 集合最好采用有序集合 SortedSet 存储结构。
    实际上,SDK 中的 waitForConfirms()方法也是通过 SortedSet维护消息序号的。
    import java.io.IOException;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
         private static final String QUEUE_NAMW =  "test_tx_confirm3";
         
         public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               //生产者调用confirmSelect,将channel设置为confirm模式
               channel.confirmSelect();
               
               //未确认的消息标识
               final SortedSet<Long> confirmSet =  Collections.synchronizedSortedSet(new TreeSet<Long>());
               
               //频道加一个监听
               channel.addConfirmListener(new ConfirmListener() {
                    
                    //回调/重发重试  可以1s之后再发 10s之后再发
                    @Override
                    public void handleNack(long deliveryTag, boolean  multiple) throws IOException {
                         if(multiple){
                               System.out.println("handleNack-----multiple =1");
                               confirmSet.headSet(deliveryTag+1).clear();;
                         }else{
                               System.out.println("handleNack-----multiple =0");
                               confirmSet.remove(deliveryTag);
                         }                    
                    }
                    
                    //没问题的handleAck
                    @Override
                    public void handleAck(long deliveryTag, boolean  multiple) throws IOException {
                         if(multiple){
                               System.out.println("handleAck-----multiple =1");
                               confirmSet.headSet(deliveryTag+1).clear();;
                         }else{
                               System.out.println("handleAck-----multiple =0");
                               confirmSet.remove(deliveryTag);
                         }
                         
                    }
               });
               
               
               String msg = "confirm";
               //模拟插入数据
                while(true){
                    long seqNo = channel.getNextPublishSeqNo();
                    channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                    confirmSet.add(seqNo);
               }
         }
    }
  • 相关阅读:
    tab点击切换
    下拉收起
    倒计时
    边框三角形
    jQuery Easing 使用方法及其图解
    网址收藏
    Java遍历Map对象的四种方式
    idea初使用之自动编译
    Mysql 存储引擎中InnoDB与Myisam的主要区别
    spring-boot-devtools在Idea中热部署方法
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531721.html
Copyright © 2011-2022 走看看