zoukankan      html  css  js  c++  java
  • 8、RabbitMQ-消息的确认机制(生产者)

    RabbitMQ 之消息确认机制(事务+Confirm)

    https://blog.csdn.net/u013256816/article/details/55515234

    概述:

    在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题
     
    除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正
    确到达 Rabbit 服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器
    不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;
    导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!
    RabbitMQ 为我们提供了两种方式:
    1. 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;
    2. 通过将 channel 设置成 confirm 模式来实现
    事务机制                                                                                                                                                                          RabbitMQ 中与事务机制有关的方法有三个:txSelect(), txCommit()以及 txRollback(), 
    txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,
    txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息
    给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,
    如果在 txCommit执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候
    我们便可以捕获异常通过 txRollback 回滚事务了。                                                                                                                                 
    txSelect   txCommit  txRollback                                                                                                                                txSelect:用户将当前channel设置成transation模式                                                                                                          txCommit :用于提交事务
    txRollback :用户回滚事务

    生产者:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
         private static final String QUEUE_NAMW = "test_tx_queue";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               String msg = "tx";
               
               try {
                    //开启事务模式、
                    channel.txSelect();
                    channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                    //模拟事故
                    int i = 1/0;
                    //提交
                    channel.txCommit();
               } catch (Exception e) {
              //进行事务回滚 channel.txRollback(); System.
    out.println("TxRollback..."); } channel.close(); conn.close(); } }

      消费者:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    public class TxReceive {
         
         private static final String QUEUE_NAMW = "test_tx_queue";
    public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //队列声明
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               channel.basicQos(1);
               
               //绑定队列到交换机转发器
               
               //channel.queueBind(QUEUE_NAMW, "", "");
               
                         //定义一个消费者
                         Consumer consumer = new  DefaultConsumer(channel){
                               //收到消息就会触发这个方法
                               @Override
                               public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                         throws IOException {
                                    String msg = new  String(body,"utf-8");
                                    System.out.println("消费者1接收到的消息" + msg);
                                    
                                    try {
                                         Thread.sleep(1500);
                                    } catch (InterruptedException e)  {
                                         e.printStackTrace();
                                    }finally{
                                         System.out.println("消费者1处理完成!");
                                         //手动回执
                                         channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                               }
                         };
                         //监听队列
                         //自动应答false
                         boolean autoAck = false;
                         channel.basicConsume(QUEUE_NAMW, autoAck,  consumer);
         }
    }

     此时消费者不会接收到消息

     

    此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量   

     Confirm模式 

    概述 

    上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随
    后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低
    RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。 

      producer 端 confirm 模式的实现原理 

     

       该模式最大的好处就是异步的!!!     

    开启 confirm 模式的方法                                                                                                                     已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。                生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式                                                            核心代码:

    //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
    channel.confirmSelect();     

     编程模式

    1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端
        confirm。实际上是一种串行 confirm 了。
    2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端
        confirm。
    3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回
        调这个方法。

     普通模式:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class confirm{
         private static final String QUEUE_NAMW =  "test_tx_confirm1";
         
         public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               //生产者调用confirmSelect,将channel设置为confirm模式
               channel.confirmSelect();
               String msg = "confirm";
               channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
               if(!channel.waitForConfirms()){
                    System.out.println("send failed");
               }else{
                    System.out.println("send ok");
               }
               channel.close();
               conn.close();
         }
    }

    批量模式
    批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)
    或者定量(达到多少条)或者两则结合起来publish 消息,然后等待
    服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm
     效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情
    况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数
    量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
    private static final String QUEUE_NAMW = "test_tx_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
          Connection conn = ConnectionUtils.getConnection();
          Channel channel = conn.createChannel();

          channel.queueDeclare(QUEUE_NAMW, false, false, false, null);

          //1
          //生产者调用confirmSelect,将channel设置为confirm模式
          channel.confirmSelect();

          //2
          String msg = "confirm";
          //批量发送
          for(int i=1;i<=10;i++){
            channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
          }

          //3
          //确认
          if(!channel.waitForConfirms()){
            System.out.println("send failed");
          }else{
            System.out.println("send ok");
          }

          channel.close();
          conn.close();
    }
    }

    异步模式
    Channel 对象提供的 ConfirmListener()回调方法只包含 deliveryTag
    (当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel 
    维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中
    元素加 1,每回调一次 handleAck方法,unconfirm 集合删掉相应的
    一条(multiple=false)或多条(multiple=true)记录。从程序运行
    效率上看,这个unconfirm 集合最好采用有序集合 SortedSet 存储结构。
    实际上,SDK 中的 waitForConfirms()方法也是通过 SortedSet维护消息序号的。
    import java.io.IOException;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class TXsend {
         private static final String QUEUE_NAMW =  "test_tx_confirm3";
         
         public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
             Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);
               
               //生产者调用confirmSelect,将channel设置为confirm模式
               channel.confirmSelect();
               
               //未确认的消息标识
               final SortedSet<Long> confirmSet =  Collections.synchronizedSortedSet(new TreeSet<Long>());
               
               //频道加一个监听
               channel.addConfirmListener(new ConfirmListener() {
                    
                    //回调/重发重试  可以1s之后再发 10s之后再发
                    @Override
                    public void handleNack(long deliveryTag, boolean  multiple) throws IOException {
                         if(multiple){
                               System.out.println("handleNack-----multiple =1");
                               confirmSet.headSet(deliveryTag+1).clear();;
                         }else{
                               System.out.println("handleNack-----multiple =0");
                               confirmSet.remove(deliveryTag);
                         }                    
                    }
                    
                    //没问题的handleAck
                    @Override
                    public void handleAck(long deliveryTag, boolean  multiple) throws IOException {
                         if(multiple){
                               System.out.println("handleAck-----multiple =1");
                               confirmSet.headSet(deliveryTag+1).clear();;
                         }else{
                               System.out.println("handleAck-----multiple =0");
                               confirmSet.remove(deliveryTag);
                         }
                         
                    }
               });
               
               
               String msg = "confirm";
               //模拟插入数据
                while(true){
                    long seqNo = channel.getNextPublishSeqNo();
                    channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                    confirmSet.add(seqNo);
               }
         }
    }
  • 相关阅读:
    格式化数字,将字符串格式的数字,如:1000000 改为 1 000 000 这种展示方式
    jquery图片裁剪插件
    前端开发采坑之安卓和ios的兼容问题
    页面消息提示,上下滚动
    可以使用css的方式让input不能输入文字吗?
    智慧农村“三网合一”云平台测绘 大数据 农业 信息平台 应急
    三维虚拟城市平台测绘 大数据 规划 三维 信息平台 智慧城市
    农业大数据“一张图”平台测绘 大数据 房产 国土 农业 信息平台
    应急管理管理局安全生产预警平台应急管理系统不动产登记 测绘 大数据 规划 科教 三维 信息平台
    地下综合管廊管理平台测绘 大数据 地下管线 三维 信息平台
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531721.html
Copyright © 2011-2022 走看看