zoukankan      html  css  js  c++  java
  • MQ确认机制之事务机制,confirm串行

    一:介绍

    1.说明原理

      A:生产者将信道设置成confirm模式,一旦信道进到confirm模式,所有该信道上发布的消息都会被指派一个唯一的ID(从1开始)。

      一旦消息被投递到所有匹配的队列后,broker就会发送一个确认给生产者,并包括了唯一的ID,这样就使得生产者知道消息已经到达目的队列。

      B:如果消息和消息队列是可持久化的,那么确认消息会将消息写入磁盘后发出,broker会回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,并且broker也可以设置basic.ack的multiple域。

      用来表示这个序列号之前所有的消息已经得到处理。

    二:程序(发送一条数据)

    1.生产者

     1 package com.mq.confirm;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class Send {
     8     private static final String QUEUE_NAME="test_queue_confirm";
     9     public static void main(String[] args)throws Exception{
    10         Connection connection= ConnectionUtil.getConnection();
    11         Channel channel=connection.createChannel();
    12         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    13 
    14         channel.confirmSelect();
    15         String msg="confirm msg";
    16         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    17 
    18         if (!channel.waitForConfirms()){
    19             System.out.println("message send failed");
    20         }else{
    21             System.out.println("message send success");
    22         }
    23     }
    24 }

    2.消费者

     1 package com.mq.confirm;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Receive {
     9     private static final String QUEUE_NAME="test_queue_confirm";
    10     public static void main(String[] args)throws Exception {
    11         Connection connection = ConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    14         channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    17                 System.out.println(new String(body,"utf-8"));
    18             }
    19         });
    20     }
    21 }

    3.现象

      发送成功后,在send端会返回发送成功、

      在接收端会接受到信息。

      

      

    三:程序(发送多条)

    1.生产者

     1 package com.mq.confirm;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class SendMultipleMessage {
     8     private static final String QUEUE_NAME="test_queue_confirm";
     9     public static void main(String[] args)throws Exception{
    10         Connection connection= ConnectionUtil.getConnection();
    11         Channel channel=connection.createChannel();
    12         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    13 
    14         channel.confirmSelect();
    15         String msg="confirm msg";
    16 
    17         //一次性发送多条数据
    18         for (int i=0;i<10;i++){
    19             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    20         }
    21 
    22         //确认一次
    23         if (!channel.waitForConfirms()){
    24             System.out.println("message send failed");
    25         }else{
    26             System.out.println("message send success");
    27         }
    28     }
    29 }

    2.原理‘

      原理就是一次发送多条数据,然后一次返回判断。

      缺点:如果丢失,就是全部丢失。

    3.现象

      

  • 相关阅读:
    Java ClassLoader机制
    Spring JMS
    MySQL权限分配
    Java参数传递机制
    JVM装载过程
    PowerDesigner15使用时的十五个问题
    修改当前行 传值
    WebSphere MQ
    Hibernate Search牛刀小试 (转)
    关于hibernate的缓存使用
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8627622.html
Copyright © 2011-2022 走看看